Code integration with the abstractions

Changes to make the integration of the existing code with the data source
and job binary abstractions possible.

Change-Id: I524f25ac95bb634b0583113792460c17217acc34
Implements: blueprint data-source-plugin
This commit is contained in:
Marianne Linhares Monteiro 2017-01-31 18:48:43 -03:00
parent bd9dc1a21c
commit 2def30f412
41 changed files with 394 additions and 717 deletions

View File

@ -22,7 +22,7 @@ from oslo_config import cfg
from sahara.conductor import resource as r
from sahara.db import base as db_base
from sahara.service.castellan import utils as key_manager
from sahara.service import shares
from sahara.service.edp.utils import shares
from sahara.utils import configs
from sahara.utils import crypto

View File

@ -15,9 +15,10 @@
import os
from sahara import context
import sahara.plugins.mapr.util.maprfs_helper as mfs
import sahara.plugins.mapr.versions.version_handler_factory as vhf
import sahara.service.edp.binary_retrievers.dispatch as d
from sahara.service.edp.job_binaries import manager as jb_manager
import sahara.service.edp.oozie.engine as e
from sahara.utils import edp
@ -52,16 +53,23 @@ class MapROozieJobEngine(e.OozieJobEngine):
with where.remote() as r:
for m in mains:
raw_data = d.get_raw_binary(m, proxy_configs)
mfs.put_file_to_maprfs(r, raw_data, m.name, job_dir, hdfs_user)
uploaded_paths.append(os.path.join(job_dir, m.name))
path = jb_manager.JOB_BINARIES. \
get_job_binary_by_url(m.url). \
copy_binary_to_cluster(m, proxy_configs=proxy_configs,
remote=r, context=context.ctx())
target = os.path.join(job_dir, m.name)
mfs.copy_from_local(r, path, target, hdfs_user)
uploaded_paths.append(target)
if len(libs) > 0:
self.create_hdfs_dir(r, lib_dir)
for l in libs:
raw_data = d.get_raw_binary(l, proxy_configs)
mfs.put_file_to_maprfs(r, raw_data, l.name, lib_dir,
hdfs_user)
uploaded_paths.append(os.path.join(lib_dir, l.name))
path = jb_manager.JOB_BINARIES. \
get_job_binary_by_url(l.url). \
copy_binary_to_cluster(l, proxy_configs=proxy_configs,
remote=r, context=context.ctx())
target = os.path.join(lib_dir, l.name)
mfs.copy_from_local(r, path, target, hdfs_user)
uploaded_paths.append(target)
for lib in builtin_libs:
mfs.put_file_to_maprfs(r, lib['raw'], lib['name'], lib_dir,
hdfs_user)

View File

@ -15,8 +15,7 @@
from sahara import conductor as c
from sahara import context
from sahara.service.edp.binary_retrievers import dispatch
from sahara.service.edp.job_binaries import manager as jb_manager
conductor = c.API
@ -69,4 +68,5 @@ def update_job_binary_internal(id, values):
def get_job_binary_data(id):
job_binary = conductor.job_binary_get(context.ctx(), id)
return dispatch.get_raw_binary(job_binary, with_context=True)
return jb_manager.JOB_BINARIES.get_job_binary(job_binary.type). \
get_raw_data(job_binary, with_context=True)

View File

@ -14,7 +14,7 @@
# limitations under the License.
from sahara.service.edp import job_utils
from sahara.service import shares as shares_service
from sahara.service.edp.utils import shares as shares_service
def get_file_info(job_binary, remote):

View File

@ -20,7 +20,7 @@ from sahara import exceptions as ex
from sahara.i18n import _
from sahara.service.edp.data_sources.base import DataSourceType
from sahara.service.edp import job_utils
from sahara.service import shares as shares_service
from sahara.service.edp.utils import shares as shares_service
class ManilaType(DataSourceType):
@ -49,6 +49,11 @@ class ManilaType(DataSourceType):
return path
def get_runtime_url(self, url, cluster):
# TODO(mariannelm): currently the get_runtime_url method is responsible
# for preparing the cluster for the manila job type which is not the
# best approach. In order to make a prepare_cluster method for manila
# the edp/job_utils.py resolve_data_source_reference function must be
# refactored
path = self._prepare_cluster(url, cluster)
# This gets us the mount point, but we need a file:// scheme to
# indicate a local filesystem path

View File

@ -63,6 +63,9 @@ class SwiftType(DataSourceType):
if hasattr(data_source, "credentials"):
job_configs = kwargs.pop('job_configs')
# if no data source was passed as a reference for the job, the
# job_configs will not be changed (so it will be a FronzenDict)
# and we don't need to change it as well
if isinstance(job_configs, FrozenDict) or \
job_configs.get('configs', None) is None:
return

View File

@ -88,6 +88,14 @@ def _get_cluster_hosts_information(host, cluster):
return None
def _is_cluster_configured(cluster, host_info):
inst = u.get_instances(cluster)[0]
cat_etc_hosts = 'cat /etc/hosts'
with inst.remote() as r:
exit_code, etc_hosts = r.execute_command(cat_etc_hosts)
return all(host in etc_hosts for host in host_info)
def configure_cluster_for_hdfs(cluster, data_source_url):
host = urlparse.urlparse(data_source_url).hostname
@ -96,10 +104,15 @@ def configure_cluster_for_hdfs(cluster, data_source_url):
# Ip address hasn't been resolved, the last chance is for VM itself
return
etc_hosts_update = '/tmp/etc-hosts-update.%s' % six.text_type(
uuidutils.generate_uuid())
tmp_etc_hosts = '/tmp/etc-hosts.%s' % six.text_type(
uuidutils.generate_uuid())
# If the cluster was already configured for this data source
# there's no need to configure it again
if _is_cluster_configured(cluster, etc_hosts_information.splitlines()):
return
etc_hosts_update = ('/tmp/etc-hosts-update'
'.%s' % six.text_type(uuidutils.generate_uuid()))
tmp_etc_hosts = ('/tmp/etc-hosts'
'.%s' % six.text_type(uuidutils.generate_uuid()))
update_etc_hosts_cmd = (
'cat %(etc_hosts_update)s /etc/hosts | '
'sort | uniq > %(tmp_etc_hosts)s && '

View File

@ -47,6 +47,13 @@ class JobBinaryType(object):
:returns: String representing the local path
"""
# TODO(mariannelm): currently for the job binaries it's true
# that the raw data must be available at a FS path in the cluster, but
# for most of the job binary types there's no need to keep this data
# in the cluster after the job is done, so it would be a good thing to
# have a method responsible for removing the job binary raw data
# after the end of the job
return None
@plugins_base.required_with_default
@ -62,7 +69,7 @@ class JobBinaryType(object):
@plugins_base.optional
def _validate_url(self, url):
"""Auxiliar method used by the validate method"""
"""Auxiliary method used by the validate method"""
pass
@plugins_base.required_with_default
@ -91,6 +98,6 @@ class JobBinaryType(object):
raise ex.NotImplementedException()
@plugins_base.optional
def _generate_valid_path(self, job_exec_id, job_binary):
def _generate_valid_path(self, job_binary):
"""Generates a valid FS path for the binary be placed"""
return '/tmp/%s.%s' % (job_exec_id, job_binary.name)
return '/tmp/' + job_binary.name

View File

@ -14,7 +14,6 @@
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import uuidutils
import six.moves.urllib.parse as urlparse
@ -26,16 +25,14 @@ import sahara.service.validations.edp.base as b
CONF = cfg.CONF
conductor = c.API
LOG = logging.getLogger(__name__)
class InternalDBType(JobBinaryType):
def copy_binary_to_cluster(self, job_binary, **kwargs):
# url example: 'internal-db://JobBinaryInternal-UUID'
r = kwargs.pop('remote')
job_exec_id = kwargs.pop('job_exec_id')
dst = self._generate_valid_path(job_exec_id, job_binary)
dst = self._generate_valid_path(job_binary)
raw = self.get_raw_data(job_binary, **kwargs)
r.write_file_to(dst, raw)
@ -63,11 +60,11 @@ class InternalDBType(JobBinaryType):
def _validate_url(self, url):
if len(url) == 0:
raise ex.InvalidDataException(
_("Intenal data base url must not be empty"))
_("Internal data base url must not be empty"))
url = urlparse.urlparse(url)
if url.scheme != "internal-db":
raise ex.InvalidDataException(
_("URL scheme must be 'internal-db'"))
if not uuidutils.is_uuid_like(url.netloc):
raise ex.InvalidDataException(
_("Intenal data base url netloc must be a uuid"))
_("Internal data base url netloc must be a uuid"))

View File

@ -21,7 +21,7 @@ from sahara import exceptions as ex
from sahara.i18n import _
from sahara.service.edp.job_binaries.base import JobBinaryType
from sahara.service.edp import job_utils
from sahara.service import shares as shares_service
from sahara.service.edp.utils import shares as shares_service
from sahara.utils.openstack import manila as m
conductor = c.API

View File

@ -31,9 +31,8 @@ CONF = cfg.CONF
class SwiftType(JobBinaryType):
def copy_binary_to_cluster(self, job_binary, **kwargs):
r = kwargs.pop('remote')
job_exec_id = kwargs.pop('job_exec_id')
dst = self._generate_valid_path(job_exec_id, job_binary)
dst = self._generate_valid_path(job_binary)
raw = self.get_raw_data(job_binary, **kwargs)
r.write_file_to(dst, raw)

View File

@ -13,10 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import re
import string
from oslo_config import cfg
from oslo_utils import uuidutils
import six
@ -24,9 +20,8 @@ import six
from sahara import conductor as c
from sahara import context
from sahara.plugins import base as plugin_base
from sahara.service import shares as shares_service
from sahara.swift import swift_helper as sw
from sahara.utils.openstack import manila as m
from sahara.service.edp.data_sources import manager as ds_manager
from sahara.service.edp.utils import shares as shares_service
from sahara.utils import remote
@ -53,6 +48,10 @@ def get_plugin(cluster):
return plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
def get_data_source(ds_name):
return ds_manager.DATA_SOURCES.get_data_source(ds_name)
def create_workflow_dir(where, path, job, use_uuid=None, chmod=""):
if use_uuid is None:
@ -68,14 +67,20 @@ def create_workflow_dir(where, path, job, use_uuid=None, chmod=""):
return constructed_dir
def get_data_sources(job_execution, job, data_source_urls, cluster=None):
def _get_data_source_urls(ds, cluster, job_exec_id):
# returns a tuple (native_url, runtime_url)
return get_data_source(ds.type).get_urls(ds.url, cluster, job_exec_id)
def get_input_output_data_sources(job_execution, job, data_source_urls,
cluster=None):
def _construct(ctx, ds_id):
job_exec_id = job_execution.id
source = conductor.data_source_get(ctx, ds_id)
if source and source.id not in data_source_urls:
url = _construct_data_source_url(source.url, job_execution.id)
runtime_url = _runtime_url(url, cluster)
data_source_urls[source.id] = (url, runtime_url)
data_source_urls[source.id] = _get_data_source_urls(source,
cluster,
job_exec_id)
return source
ctx = context.ctx()
@ -159,26 +164,6 @@ def find_possible_data_source_refs_by_uuid(job_configs):
return _data_source_ref_search(job_configs, uuidutils.is_uuid_like)
def _add_credentials_for_data_sources(ds_list, configs):
username = password = None
for src in ds_list:
if src.type == "swift" and hasattr(src, "credentials"):
if "user" in src.credentials:
username = src.credentials['user']
if "password" in src.credentials:
password = src.credentials['password']
break
# Don't overwrite if there is already a value here
if configs.get(sw.HADOOP_SWIFT_USERNAME, None) is None and (
username is not None):
configs[sw.HADOOP_SWIFT_USERNAME] = username
if configs.get(sw.HADOOP_SWIFT_PASSWORD, None) is None and (
password is not None):
configs[sw.HADOOP_SWIFT_PASSWORD] = password
def resolve_data_source_references(job_configs,
job_exec_id,
data_source_urls,
@ -236,10 +221,8 @@ def resolve_data_source_references(job_configs,
ds = ds[0]
ds_seen[ds.id] = ds
if ds.id not in data_source_urls:
url = _construct_data_source_url(ds.url, job_exec_id)
runtime_url = _runtime_url(url, cluster)
data_source_urls[ds.id] = (url, runtime_url)
data_source_urls[ds.id] = _get_data_source_urls(
ds, cluster, job_exec_id)
return data_source_urls[ds.id][1]
return value
@ -259,50 +242,21 @@ def resolve_data_source_references(job_configs,
if not ds_seen:
return [], job_configs
# If there are no proxy_configs and the user has not already set configs
# for swift credentials, set those configs based on data_sources we found
if not job_configs.get('proxy_configs'):
_add_credentials_for_data_sources(ds_seen, new_configs['configs'])
else:
# we'll need to copy these, too, so job_configs is complete
# If there are proxy_configs we'll need to copy these, too,
# so job_configs is complete
if job_configs.get('proxy_configs'):
new_configs['proxy_configs'] = {
k: v for k, v in six.iteritems(job_configs.get('proxy_configs'))}
return ds_seen, new_configs
def _construct_data_source_url(url, job_exec_id):
"""Resolve placeholders in data_source URL.
Supported placeholders:
* %RANDSTR(len)% - will be replaced with random string of lowercase
letters of length `len`.
* %JOB_EXEC_ID% - will be replaced with the job execution ID.
"""
def _randstr(match):
len = int(match.group(1))
return ''.join(random.choice(string.ascii_lowercase)
for _ in six.moves.range(len))
url = url.replace("%JOB_EXEC_ID%", job_exec_id)
url = re.sub(r"%RANDSTR\((\d+)\)%", _randstr, url)
return url
def _runtime_url(url, cluster):
if url.startswith(m.MANILA_PREFIX) and cluster:
path = shares_service.get_share_path(url, cluster.shares or [])
if path is None:
path = mount_share_at_default_path(url, cluster)
# This gets us the mount point, but we need a file:// scheme to
# indicate a local filesystem path
return "file://{path}".format(path=path)
return url
def prepare_cluster_for_ds(data_sources, cluster, job_configs, ds_urls):
for ds in data_sources:
if ds:
get_data_source(ds.type).prepare_cluster(
ds, cluster, job_configs=job_configs,
runtime_url=ds_urls[ds.id])
def to_url_dict(data_source_urls, runtime=False):

View File

@ -24,8 +24,8 @@ import six
from sahara import conductor as c
from sahara import context
from sahara.service.edp import base_engine
from sahara.service.edp.binary_retrievers import dispatch
from sahara.service.edp import hdfs_helper as h
from sahara.service.edp.job_binaries import manager as jb_manager
from sahara.service.edp import job_utils
from sahara.service.edp.oozie import oozie as o
from sahara.service.edp.oozie.workflow_creator import workflow_factory
@ -134,7 +134,8 @@ class OozieJobEngine(base_engine.JobEngine):
prepared_job_params = {}
job = conductor.job_get(ctx, job_execution.job_id)
input_source, output_source = job_utils.get_data_sources(
input_source, output_source = job_utils.get_input_output_data_sources(
job_execution, job, data_source_urls, self.cluster)
# Updated_job_configs will be a copy of job_execution.job_configs with
@ -159,6 +160,11 @@ class OozieJobEngine(base_engine.JobEngine):
data_source_urls = job_utils.to_url_dict(data_source_urls,
runtime=True)
data_sources = additional_sources + [input_source, output_source]
job_utils.prepare_cluster_for_ds(data_sources,
self.cluster, updated_job_configs,
data_source_urls)
proxy_configs = updated_job_configs.get('proxy_configs')
configs = updated_job_configs.get('configs', {})
use_hbase_lib = configs.get('edp.hbase_common_lib', {})
@ -171,12 +177,6 @@ class OozieJobEngine(base_engine.JobEngine):
if k.startswith('oozie.'):
oozie_params[k] = configs[k]
for data_source in [input_source, output_source] + additional_sources:
if data_source and data_source.type == 'hdfs':
h.configure_cluster_for_hdfs(
self.cluster, data_source_urls[data_source.id])
break
external_hdfs_urls = self._resolve_external_hdfs_urls(
job_execution.job_configs)
for url in external_hdfs_urls:
@ -334,10 +334,16 @@ class OozieJobEngine(base_engine.JobEngine):
edp.JOB_TYPE_PIG,
edp.JOB_TYPE_SHELL]
def _prepare_job_binaries(self, job_binaries, r):
for jb in job_binaries:
jb_manager.JOB_BINARIES.get_job_binary_by_url(jb.url). \
prepare_cluster(jb, remote=r)
def _upload_job_files_to_hdfs(self, where, job_dir, job, configs,
proxy_configs=None):
mains = job.mains or []
libs = job.libs or []
mains = list(job.mains) if job.mains else []
libs = list(job.libs) if job.libs else []
builtin_libs = edp.get_builtin_binaries(job, configs)
uploaded_paths = []
hdfs_user = self.get_hdfs_user()
@ -345,33 +351,40 @@ class OozieJobEngine(base_engine.JobEngine):
lib_dir = os.path.join(job_dir, job_dir_suffix)
with remote.get_remote(where) as r:
for main in mains:
raw_data = dispatch.get_raw_binary(
main, proxy_configs=proxy_configs, remote=r)
if isinstance(raw_data, dict) and raw_data["type"] == "path":
h.copy_from_local(r, raw_data['path'],
job_dir, hdfs_user)
else:
h.put_file_to_hdfs(r, raw_data, main.name,
job_dir, hdfs_user)
uploaded_paths.append(job_dir + '/' + main.name)
job_binaries = mains + libs
self._prepare_job_binaries(job_binaries, r)
# upload mains
uploaded_paths.extend(self._upload_job_binaries(r, mains,
proxy_configs,
hdfs_user,
job_dir))
# upload libs
if len(libs) and job_dir_suffix:
# HDFS 2.2.0 fails to put file if the lib dir does not exist
self.create_hdfs_dir(r, lib_dir)
for lib in libs:
raw_data = dispatch.get_raw_binary(
lib, proxy_configs=proxy_configs, remote=remote)
if isinstance(raw_data, dict) and raw_data["type"] == "path":
h.copy_from_local(r, raw_data['path'],
lib_dir, hdfs_user)
else:
h.put_file_to_hdfs(r, raw_data, lib.name,
lib_dir, hdfs_user)
uploaded_paths.append(lib_dir + '/' + lib.name)
uploaded_paths.extend(self._upload_job_binaries(r, libs,
proxy_configs,
hdfs_user,
lib_dir))
# upload buitin_libs
for lib in builtin_libs:
h.put_file_to_hdfs(r, lib['raw'], lib['name'], lib_dir,
hdfs_user)
uploaded_paths.append(lib_dir + '/' + lib['name'])
uploaded_paths.append(lib_dir + lib['name'])
return uploaded_paths
def _upload_job_binaries(self, r, job_binaries, proxy_configs,
hdfs_user, job_dir):
uploaded_paths = []
for jb in job_binaries:
path = jb_manager.JOB_BINARIES. \
get_job_binary_by_url(jb.url). \
copy_binary_to_cluster(jb, proxy_configs=proxy_configs,
remote=r, context=context.ctx())
h.copy_from_local(r, path, job_dir, hdfs_user)
uploaded_paths.append(path)
return uploaded_paths
def _create_hdfs_workflow_dir(self, where, job):

View File

@ -25,8 +25,7 @@ from sahara import exceptions as e
from sahara.i18n import _
from sahara.service.castellan import utils as key_manager
from sahara.service.edp import base_engine
from sahara.service.edp.binary_retrievers import dispatch
from sahara.service.edp import hdfs_helper as h
from sahara.service.edp.job_binaries import manager as jb_manager
from sahara.service.edp import job_utils
from sahara.service.validations.edp import job_execution as j
from sahara.swift import swift_helper as sw
@ -134,17 +133,20 @@ class SparkJobEngine(base_engine.JobEngine):
r.write_file_to(dst, content)
return xml_name
def _prepare_job_binaries(self, job_binaries, r):
for jb in job_binaries:
jb_manager.JOB_BINARIES.get_job_binary_by_url(jb.url). \
prepare_cluster(jb, remote=r)
def _upload_job_files(self, where, job_dir, job, job_configs):
def upload(r, dir, job_file, proxy_configs):
dst = os.path.join(dir, job_file.name)
raw_data = dispatch.get_raw_binary(
job_file, proxy_configs=proxy_configs, remote=r)
if isinstance(raw_data, dict) and raw_data["type"] == "path":
dst = raw_data['path']
else:
r.write_file_to(dst, raw_data)
return dst
path = jb_manager.JOB_BINARIES. \
get_job_binary_by_url(job_file.url). \
copy_binary_to_cluster(job_file,
proxy_configs=proxy_configs,
remote=r, context=context.ctx())
return path
def upload_builtin(r, dir, builtin):
dst = os.path.join(dir, builtin['name'])
@ -164,7 +166,11 @@ class SparkJobEngine(base_engine.JobEngine):
with remote.get_remote(where) as r:
mains = list(job.mains) if job.mains else []
libs = list(job.libs) if job.libs else []
for job_file in mains+libs:
job_binaries = mains + libs
self._prepare_job_binaries(job_binaries, r)
for job_file in job_binaries:
uploaded_paths.append(
upload(r, job_dir, job_file,
job_configs.get('proxy_configs')))
@ -310,10 +316,9 @@ class SparkJobEngine(base_engine.JobEngine):
data_source_urls = job_utils.to_url_dict(data_source_urls,
runtime=True)
for data_source in additional_sources:
if data_source and data_source.type == 'hdfs':
h.configure_cluster_for_hdfs(self.cluster, data_source)
break
job_utils.prepare_cluster_for_ds(additional_sources,
self.cluster, updated_job_configs,
data_source_urls)
# It is needed in case we are working with Spark plugin
self.plugin_params['master'] = (

View File

@ -24,7 +24,7 @@ from sahara import exceptions as e
from sahara.i18n import _
from sahara.plugins import utils as plugin_utils
from sahara.service.edp import base_engine
from sahara.service.edp.binary_retrievers import dispatch
from sahara.service.edp.job_binaries import manager as jb_manager
from sahara.service.edp import job_utils
from sahara.service.validations.edp import job_execution as j
from sahara.utils import cluster as cluster_utils
@ -111,24 +111,29 @@ class StormJobEngine(base_engine.JobEngine):
path = "service/edp/resources/launch_command.py"
return files.get_file_text(path)
def _prepare_job_binaries(self, job_binaries, r):
for jb in job_binaries:
jb_manager.JOB_BINARIES.get_job_binary_by_url(jb.url). \
prepare_cluster(jb, remote=r)
def _upload_job_files(self, where, job_dir, job, job_configs):
def upload(r, dir, job_file, proxy_configs):
dst = os.path.join(dir, job_file.name)
raw_data = dispatch.get_raw_binary(job_file,
proxy_configs=proxy_configs,
remote=remote)
if isinstance(raw_data, dict) and raw_data["type"] == "path":
dst = raw_data['path']
else:
r.write_file_to(dst, raw_data)
return dst
path = jb_manager.JOB_BINARIES. \
get_job_binary_by_url(job_file.url). \
copy_binary_to_cluster(job_file,
proxy_configs=proxy_configs,
remote=r, context=context.ctx())
return path
uploaded_paths = []
with remote.get_remote(where) as r:
mains = list(job.mains) if job.mains else []
libs = list(job.libs) if job.libs else []
for job_file in mains+libs:
job_binaries = mains + libs
self._prepare_job_binaries(job_binaries, r)
for job_file in job_binaries:
uploaded_paths.append(
upload(r, job_dir, job_file,
job_configs.get('proxy_configs')))
@ -234,6 +239,10 @@ class StormJobEngine(base_engine.JobEngine):
data_source_urls = job_utils.to_url_dict(data_source_urls,
runtime=True)
job_utils.prepare_cluster_for_ds(additional_sources,
self.cluster, updated_job_configs,
data_source_urls)
# We'll always run the driver program on the master
master = plugin_utils.get_instance(self.cluster, "nimbus")
@ -245,8 +254,6 @@ class StormJobEngine(base_engine.JobEngine):
paths = self._upload_job_files(master, wf_dir, job,
updated_job_configs)
# We can shorten the paths in this case since we'll run out of wf_dir
paths = [os.path.basename(p) for p in paths]
topology_name = self._set_topology_name(job_execution, job.name)
# Launch the storm job using storm jar

View File

View File

@ -28,9 +28,9 @@ from sahara.i18n import _
from sahara.i18n import _LE
from sahara.plugins import base as plugin_base
from sahara.service.edp import job_manager
from sahara.service.edp.utils import shares
from sahara.service.health import verification_base as ver_base
from sahara.service import ntp_service
from sahara.service import shares
from sahara.service import trusts
from sahara.utils import cluster as c_u
from sahara.utils import remote

View File

@ -16,22 +16,21 @@
import re
from oslo_config import cfg
from oslo_utils import uuidutils
import six.moves.urllib.parse as urlparse
from sahara import conductor as c
from sahara import context
import sahara.exceptions as ex
from sahara.i18n import _
import sahara.service.edp.data_sources.manager as ds_manager
import sahara.service.validations.edp.base as b
from sahara.swift import utils as su
CONF = cfg.CONF
def check_data_source_create(data, **kwargs):
b.check_data_source_unique_name(data['name'])
_check_data_source_url(data)
_check_data_source(data)
def _check_datasource_placeholder(url):
@ -41,8 +40,8 @@ def _check_datasource_placeholder(url):
substrings = re.findall(r"%RANDSTR\(([\-]?\d+)\)%", url)
for length in map(int, substrings):
if length <= 0:
total_length = -1
break
raise ex.InvalidDataException(_("Requested RANDSTR length"
" must be positive."))
total_length += length
if total_length > 1024:
@ -50,87 +49,11 @@ def _check_datasource_placeholder(url):
" too long, please choose a "
"value less than 1024."))
if total_length < 0:
raise ex.InvalidDataException(_("Requested RANDSTR length"
" must be positive."))
def _check_data_source_url(data):
def _check_data_source(data):
_check_datasource_placeholder(data["url"])
if "swift" == data["type"]:
_check_swift_data_source_create(data)
elif "hdfs" == data["type"]:
_check_hdfs_data_source_create(data)
elif "maprfs" == data["type"]:
_check_maprfs_data_source_create(data)
elif "manila" == data["type"]:
_check_manila_data_source_create(data)
def _check_swift_data_source_create(data):
if len(data['url']) == 0:
raise ex.InvalidDataException(_("Swift url must not be empty"))
url = urlparse.urlparse(data['url'])
if url.scheme != "swift":
raise ex.InvalidDataException(_("URL scheme must be 'swift'"))
# The swift url suffix does not have to be included in the netloc.
# However, if the swift suffix indicator is part of the netloc then
# we require the right suffix.
# Additionally, the path must be more than '/'
if (su.SWIFT_URL_SUFFIX_START in url.netloc and not url.netloc.endswith(
su.SWIFT_URL_SUFFIX)) or len(url.path) <= 1:
raise ex.InvalidDataException(
_("URL must be of the form swift://container%s/object")
% su.SWIFT_URL_SUFFIX)
if not CONF.use_domain_for_proxy_users and "credentials" not in data:
raise ex.InvalidCredentials(_("No credentials provided for Swift"))
if not CONF.use_domain_for_proxy_users and (
"user" not in data["credentials"]):
raise ex.InvalidCredentials(
_("User is not provided in credentials for Swift"))
if not CONF.use_domain_for_proxy_users and (
"password" not in data["credentials"]):
raise ex.InvalidCredentials(
_("Password is not provided in credentials for Swift"))
def _check_hdfs_data_source_create(data):
if len(data['url']) == 0:
raise ex.InvalidDataException(_("HDFS url must not be empty"))
url = urlparse.urlparse(data['url'])
if url.scheme:
if url.scheme != "hdfs":
raise ex.InvalidDataException(_("URL scheme must be 'hdfs'"))
if not url.hostname:
raise ex.InvalidDataException(
_("HDFS url is incorrect, cannot determine a hostname"))
def _check_maprfs_data_source_create(data):
if len(data['url']) == 0:
raise ex.InvalidDataException(_("MapR FS url must not be empty"))
url = urlparse.urlparse(data['url'])
if url.scheme:
if url.scheme != "maprfs":
raise ex.InvalidDataException(_("URL scheme must be 'maprfs'"))
def _check_manila_data_source_create(data):
if len(data['url']) == 0:
raise ex.InvalidDataException(_("Manila url must not be empty"))
url = urlparse.urlparse(data['url'])
if url.scheme != "manila":
raise ex.InvalidDataException(_("Manila url scheme must be 'manila'"))
if not uuidutils.is_uuid_like(url.netloc):
raise ex.InvalidDataException(_("Manila url netloc must be a uuid"))
if not url.path:
raise ex.InvalidDataException(_("Manila url path must not be empty"))
if data["type"] in CONF.data_source_types:
ds_manager.DATA_SOURCES.get_data_source(data["type"]).validate(data)
def check_data_source_update(data, data_source_id):
@ -151,4 +74,4 @@ def check_data_source_update(data, data_source_id):
'url': data.get('url', None) or ds.url,
'credentials': data.get(
'credentials', None) or ds.credentials}
_check_data_source_url(check_data)
_check_data_source(check_data)

View File

@ -16,11 +16,7 @@
from oslo_config import cfg
from oslo_log import log as logging
import sahara.exceptions as e
from sahara.i18n import _LW
import sahara.service.validations.edp.base as b
import sahara.service.validations.edp.job_binary_internal as j_b_i
from sahara.swift import utils as su
import sahara.service.edp.job_binaries.manager as jb_manager
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@ -28,20 +24,6 @@ LOG = logging.getLogger(__name__)
def check_job_binary(data, **kwargs):
job_binary_url = data.get("url", None)
extra = data.get("extra", {})
if job_binary_url:
if job_binary_url.startswith("internal-db"):
if not j_b_i.is_internal_db_enabled():
LOG.warning(_LW(
'Sahara inernal db is disabled for storing job binaries.'))
internal_uid = job_binary_url.replace(
"internal-db://", '')
b.check_job_binary_internal_exists(internal_uid)
if job_binary_url.startswith(su.SWIFT_INTERNAL_PREFIX):
if not kwargs.get('job_binary_id', None):
# Should not be checked during job binary update
if (not extra.get("user") or not extra.get("password")) and (
not CONF.use_domain_for_proxy_users):
raise e.BadJobBinaryException()
jb_manager.JOB_BINARIES.get_job_binary_by_url(job_binary_url). \
validate(data, **kwargs)

View File

@ -17,15 +17,14 @@ from oslo_config import cfg
import sahara.exceptions as e
from sahara.i18n import _
from sahara.service.edp.job_binaries import manager as jb_manager
from sahara.utils import api_validator as a
CONF = cfg.CONF
def is_internal_db_enabled():
if not CONF.edp_internal_db_enabled:
return False
return True
return 'internal-db' in jb_manager.JOB_BINARIES.get_job_binaries()
def check_job_binary_internal(data, **kwargs):

View File

@ -15,7 +15,7 @@
from sahara import exceptions as ex
from sahara.i18n import _
from sahara.service import shares
from sahara.service.edp.utils import shares
from sahara.utils.openstack import manila

View File

@ -408,7 +408,7 @@ class ClusterTest(test_base.ConductorManagerTestCase):
"tenant_id"])
self.assertEqual(args[3], {"name": "fox"})
@mock.patch("sahara.service.shares.mount_shares")
@mock.patch("sahara.service.edp.utils.shares.mount_shares")
def test_cluster_update_shares(self, mount_shares):
ctx = context.ctx()
cluster_db_obj = self.api.cluster_create(ctx, SAMPLE_CLUSTER)

View File

@ -31,7 +31,7 @@ class TestManilaShare(base.SaharaTestCase):
@mock.patch('sahara.utils.openstack.manila.client')
@mock.patch('sahara.conductor.API.cluster_update')
@mock.patch('sahara.service.shares.mount_shares')
@mock.patch('sahara.service.edp.utils.shares.mount_shares')
def test_get_file_info(self, mount_shares, cluster_update, f_manilaclient):
cluster_shares = [
{'id': 'the_share_id',

View File

@ -35,7 +35,7 @@ class TestManilaType(base.SaharaTestCase):
@mock.patch('sahara.utils.openstack.manila.client')
@mock.patch('sahara.conductor.API.cluster_update')
@mock.patch('sahara.service.shares.mount_shares')
@mock.patch('sahara.service.edp.utils.shares.mount_shares')
def test_prepare_cluster(self, mount_shares, cluster_update,
f_manilaclient):
@ -59,10 +59,10 @@ class TestManilaType(base.SaharaTestCase):
self.assertEqual(1, mount_shares.call_count)
self.assertEqual(1, cluster_update.call_count)
@mock.patch('sahara.service.shares.get_share_path')
@mock.patch('sahara.service.edp.utils.shares.get_share_path')
@mock.patch('sahara.utils.openstack.manila.client')
@mock.patch('sahara.conductor.API.cluster_update')
@mock.patch('sahara.service.shares.mount_shares')
@mock.patch('sahara.service.edp.utils.shares.mount_shares')
def test_get_runtime_url(self, mount_shares, cluster_update,
f_manilaclient, get_share_path):

View File

@ -50,12 +50,11 @@ class TestInternalDBType(testtools.TestCase):
res = self.internal_db.copy_binary_to_cluster(job_binary,
context=context,
remote=remote,
job_exec_id='id')
remote=remote)
self.assertEqual('/tmp/id.test', res)
self.assertEqual('/tmp/test', res)
remote.write_file_to.assert_called_with(
'/tmp/id.test',
'/tmp/test',
'ok')
@mock.patch('sahara.conductor.API.job_binary_internal_get_raw_data')

View File

@ -49,7 +49,7 @@ class TestManilaType(base.SaharaTestCase):
self.assertTrue(self.manila_type.
validate_job_location_format(valid_url))
@mock.patch('sahara.service.shares.default_mount')
@mock.patch('sahara.service.edp.utils.shares.default_mount')
@mock.patch('sahara.utils.openstack.manila.client')
def test_copy_binary_to_cluster(self, f_manilaclient, default_mount):
cluster_shares = [
@ -92,7 +92,7 @@ class TestManilaType(base.SaharaTestCase):
@mock.patch('sahara.utils.openstack.manila.client')
@mock.patch('sahara.conductor.API.cluster_update')
@mock.patch('sahara.service.shares.mount_shares')
@mock.patch('sahara.service.edp.utils.shares.mount_shares')
def test_prepare_cluster(self, mount_shares, cluster_update,
f_manilaclient):

View File

@ -64,12 +64,11 @@ class TestSwiftType(base.SaharaTestCase):
get_raw_data.return_value = 'test'
res = self.i_s.copy_binary_to_cluster(job_binary,
remote=remote,
job_exec_id='job_exec_id')
remote=remote)
self.assertEqual('/tmp/job_exec_id.test', res)
self.assertEqual('/tmp/test', res)
remote.write_file_to.assert_called_with(
'/tmp/job_exec_id.test',
'/tmp/test',
'test')
def test__get_raw_data(self):

View File

@ -33,5 +33,5 @@ class JobBinaryManagerSupportTest(base.SaharaTestCase):
def test_generate_valid_path(self):
jb = mock.Mock()
jb.name = 'jb_name.jar'
res = self.job_binary._generate_valid_path('job_exec_id', jb)
self.assertEqual('/tmp/job_exec_id.jb_name.jar', res)
res = self.job_binary._generate_valid_path(jb)
self.assertEqual('/tmp/jb_name.jar', res)

View File

@ -17,7 +17,9 @@ import mock
from sahara import context as ctx
from sahara.plugins import base as pb
from sahara.service.edp.job_utils import ds_manager
from sahara.service.edp.oozie import engine as oe
from sahara.service.edp.oozie.engine import jb_manager
from sahara.tests.unit import base
from sahara.tests.unit.service.edp import edp_test_utils as u
from sahara.utils import edp
@ -27,6 +29,8 @@ class TestOozieEngine(base.SaharaTestCase):
def setUp(self):
super(TestOozieEngine, self).setUp()
pb.setup_plugins()
jb_manager.setup_job_binaries()
ds_manager.setup_data_sources()
def test_get_job_status(self):
oje = FakeOozieJobEngine(u.create_cluster())
@ -98,13 +102,15 @@ class TestOozieEngine(base.SaharaTestCase):
conductor_raw_data.return_value = 'ok'
oje = FakeOozieJobEngine(u.create_cluster())
oje._prepare_job_binaries = mock.Mock()
job, _ = u.create_job_exec(edp.JOB_TYPE_PIG)
res = oje._upload_job_files_to_hdfs(mock.Mock(), 'job_prefix', job, {})
self.assertEqual(['job_prefix/script.pig'], res)
self.assertEqual(['/tmp/script.pig'], res)
job, _ = u.create_job_exec(edp.JOB_TYPE_MAPREDUCE)
res = oje._upload_job_files_to_hdfs(mock.Mock(), 'job_prefix', job, {})
self.assertEqual(['job_prefix/lib/main.jar'], res)
self.assertEqual(['/tmp/main.jar'], res)
@mock.patch('sahara.utils.remote.get_remote')
def test_upload_workflow_file(self, remote_get):
@ -176,6 +182,8 @@ class TestOozieEngine(base.SaharaTestCase):
oje.cancel_job(job_exec)
self.assertEqual(1, kill_get.call_count)
@mock.patch('sahara.service.edp.job_utils.prepare_cluster_for_ds')
@mock.patch('sahara.service.edp.job_utils._get_data_source_urls')
@mock.patch('sahara.service.edp.oozie.workflow_creator.'
'workflow_factory.get_workflow_xml')
@mock.patch('sahara.utils.remote.get_remote')
@ -183,7 +191,8 @@ class TestOozieEngine(base.SaharaTestCase):
@mock.patch('sahara.conductor.API.data_source_get')
@mock.patch('sahara.conductor.API.job_get')
def test_prepare_run_job(self, job, data_source, update,
remote, wf_factory):
remote, wf_factory, get_ds_urls,
prepare_cluster):
wf_factory.return_value = mock.MagicMock()
remote_class = mock.MagicMock()
@ -197,6 +206,8 @@ class TestOozieEngine(base.SaharaTestCase):
source = mock.MagicMock()
source.url = "localhost"
get_ds_urls.return_value = ('url', 'url')
data_source.return_value = source
oje = FakeOozieJobEngine(u.create_cluster())
_, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG)
@ -208,6 +219,8 @@ class TestOozieEngine(base.SaharaTestCase):
self.assertEqual(job_exec, res['job_execution'])
self.assertEqual({}, res['oozie_params'])
@mock.patch('sahara.service.edp.job_utils.prepare_cluster_for_ds')
@mock.patch('sahara.service.edp.job_utils._get_data_source_urls')
@mock.patch('sahara.service.edp.oozie.workflow_creator.'
'workflow_factory.get_workflow_xml')
@mock.patch('sahara.utils.remote.get_remote')
@ -216,7 +229,8 @@ class TestOozieEngine(base.SaharaTestCase):
@mock.patch('sahara.conductor.API.job_get')
@mock.patch('sahara.conductor.API.job_execution_get')
def test_run_job(self, exec_get, job, data_source,
update, remote, wf_factory):
update, remote, wf_factory, get_ds_urls,
prepare_cluster):
wf_factory.return_value = mock.MagicMock()
remote_class = mock.MagicMock()
remote_class.__exit__.return_value = 'closed'
@ -230,6 +244,8 @@ class TestOozieEngine(base.SaharaTestCase):
source.url = "localhost"
data_source.return_value = source
get_ds_urls.return_value = ('url', 'url')
oje = FakeOozieJobEngine(u.create_cluster())
client_class = mock.MagicMock()
client_class.add_job = mock.MagicMock(return_value=1)

View File

@ -19,6 +19,7 @@ import os
import mock
import sahara.exceptions as ex
from sahara.service.edp.job_utils import ds_manager
from sahara.service.edp.spark import engine as se
from sahara.tests.unit import base
from sahara.utils import edp
@ -44,6 +45,8 @@ class TestSpark(base.SaharaTestCase):
self.workflow_dir = "/wfdir"
self.driver_cp = "/usr/lib/hadoop-mapreduce/hadoop-openstack.jar:"
ds_manager.setup_data_sources()
def test_get_pid_and_inst_id(self):
'''Test parsing of job ids
@ -326,9 +329,9 @@ class TestSpark(base.SaharaTestCase):
# check that we have nothing new to report ...
self.assertIsNone(status)
@mock.patch('sahara.service.edp.binary_retrievers.dispatch.get_raw_binary')
@mock.patch('sahara.service.edp.spark.engine.jb_manager')
@mock.patch('sahara.utils.remote.get_remote')
def test_upload_job_files(self, get_remote, get_raw_binary):
def test_upload_job_files(self, get_remote, jb_manager):
main_names = ["main1", "main2", "main3"]
lib_names = ["lib1", "lib2", "lib3"]
@ -352,14 +355,26 @@ class TestSpark(base.SaharaTestCase):
get_remote.return_value.__enter__ = mock.Mock(
return_value=remote_instance)
get_raw_binary.return_value = "data"
JOB_BINARIES = mock.Mock()
mock_jb = mock.Mock()
jb_manager.JOB_BINARIES = JOB_BINARIES
JOB_BINARIES.get_job_binary_by_url = mock.Mock(return_value=mock_jb)
mock_jb.copy_binary_to_cluster = mock.Mock(side_effect=[
'/somedir/main1',
'/somedir/main2',
'/somedir/main3',
'/somedir/lib1',
'/somedir/lib2',
'/somedir/lib3'])
eng = se.SparkJobEngine("cluster")
eng._prepare_job_binaries = mock.Mock()
paths, builtins = eng._upload_job_files("where", "/somedir", job, {})
self.assertEqual(["/somedir/" + n for n in main_names + lib_names],
paths)
for path in paths:
remote_instance.write_file_to.assert_any_call(path, "data")
def _make_master_instance(self, return_code=0):
master = mock.Mock()
@ -651,9 +666,9 @@ class TestSpark(base.SaharaTestCase):
edp.JOB_STATUS_RUNNING,
{"spark-path": self.workflow_dir}), status)
@mock.patch('sahara.service.edp.hdfs_helper.configure_cluster_for_hdfs')
@mock.patch('sahara.service.edp.job_utils.prepare_cluster_for_ds')
@mock.patch('sahara.service.edp.job_utils.resolve_data_source_references')
def test_external_hdfs_config(self, resolver, configurer):
def test_external_hdfs_config(self, resolver, prepare):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass"},
}
@ -662,16 +677,17 @@ class TestSpark(base.SaharaTestCase):
data_source = mock.Mock()
data_source.type = 'hdfs'
data_source.id = 'id'
resolver.return_value = ([data_source], job_configs)
master_instance = self._make_master_instance()
self._setup_run_job(master_instance, job_configs, files)
configurer.assert_called_with("cluster", data_source)
prepare.assert_called_once()
@mock.patch('sahara.service.edp.hdfs_helper.configure_cluster_for_hdfs')
@mock.patch('sahara.service.edp.job_utils.prepare_cluster_for_ds')
@mock.patch('sahara.service.edp.job_utils.resolve_data_source_references')
def test_overridden_driver_classpath(self, resolver, configurer):
def test_overridden_driver_classpath(self, resolver, prepare):
job_configs = {
'configs': {"edp.java.main_class": "org.me.myclass",
'edp.spark.driver.classpath': "my-classpath.jar"},
@ -681,6 +697,7 @@ class TestSpark(base.SaharaTestCase):
data_source = mock.Mock()
data_source.type = 'hdfs'
data_source.id = 'id'
resolver.return_value = ([data_source], job_configs)
master_instance = self._make_master_instance()

View File

@ -16,6 +16,7 @@
import mock
from sahara.plugins.spark import shell_engine as shell_engine
from sahara.service.edp.job_utils import ds_manager
from sahara.tests.unit import base
from sahara.utils import edp
@ -30,6 +31,8 @@ class TestSparkShellEngine(base.SaharaTestCase):
self.spark_home = "/opt/spark"
self.workflow_dir = "/wfdir"
ds_manager.setup_data_sources()
def _create_master_instance(self, return_code=0):
master = mock.Mock()
master.execute_command.return_value = (return_code, self.spark_pid)

View File

@ -18,7 +18,9 @@ import os
import mock
import sahara.exceptions as ex
from sahara.service.edp.job_utils import ds_manager
from sahara.service.edp.storm import engine as se
from sahara.service.edp.storm.engine import jb_manager
from sahara.tests.unit import base
from sahara.utils import edp
@ -32,6 +34,9 @@ class TestStorm(base.SaharaTestCase):
self.storm_topology_name = "MyJob_ed8347a9-39aa-477c-8108-066202eb6130"
self.workflow_dir = "/wfdir"
jb_manager.setup_job_binaries()
ds_manager.setup_data_sources()
def test_get_topology_and_inst_id(self):
'''Test parsing of job ids
@ -210,9 +215,9 @@ class TestStorm(base.SaharaTestCase):
self.assertEqual({"status": edp.JOB_STATUS_KILLED}, status)
@mock.patch('sahara.service.edp.binary_retrievers.dispatch.get_raw_binary')
@mock.patch('sahara.service.edp.storm.engine.jb_manager')
@mock.patch('sahara.utils.remote.get_remote')
def test_upload_job_files(self, get_remote, get_raw_binary):
def test_upload_job_files(self, get_remote, jb_manager):
main_names = ["main1", "main2", "main3"]
lib_names = ["lib1", "lib2", "lib3"]
@ -225,7 +230,7 @@ class TestStorm(base.SaharaTestCase):
return objs
job = mock.Mock()
job.name = "job"
job.id = "job_exec_id"
job.mains = make_data_objects(*main_names)
job.libs = make_data_objects(*lib_names)
@ -236,14 +241,23 @@ class TestStorm(base.SaharaTestCase):
remote_instance.instance.node_group.cluster.shares = []
remote_instance.instance.node_group.shares = []
get_raw_binary.return_value = "data"
JOB_BINARIES = mock.Mock()
mock_jb = mock.Mock()
jb_manager.JOB_BINARIES = JOB_BINARIES
JOB_BINARIES.get_job_binary_by_url = mock.Mock(return_value=mock_jb)
jbs = main_names + lib_names
mock_jb.copy_binary_to_cluster = mock.Mock(
side_effect=['/tmp/%s.%s' % (job.id, j) for j in jbs])
eng = se.StormJobEngine("cluster")
eng._prepare_job_binaries = mock.Mock()
paths = eng._upload_job_files("where", "/somedir", job, {})
self.assertEqual(["/somedir/" + n for n in main_names + lib_names],
self.assertEqual(['/tmp/%s.%s' % (job.id, j) for j in jbs],
paths)
for path in paths:
remote_instance.write_file_to.assert_any_call(path, "data")
def _make_master_instance(self, return_code=0):
master = mock.Mock()
@ -346,7 +360,7 @@ class TestStorm(base.SaharaTestCase):
'cd %(workflow_dir)s; '
'./launch_command /usr/local/storm/bin/storm jar '
'-c nimbus.host=master '
'app.jar org.me.myclass %(topology_name)s '
'%(workflow_dir)s/app.jar org.me.myclass %(topology_name)s '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"topology_name": (
self.storm_topology_name)})
@ -375,7 +389,8 @@ class TestStorm(base.SaharaTestCase):
'cd %(workflow_dir)s; '
'./launch_command /usr/local/storm/bin/storm jar '
'-c nimbus.host=master '
'app.jar org.me.myclass %(topology_name)s input_arg output_arg '
'%(workflow_dir)s/app.jar org.me.myclass %(topology_name)s '
'input_arg output_arg '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"topology_name": (
self.storm_topology_name)})

View File

@ -118,11 +118,14 @@ class HDFSHelperTestCase(base.SaharaTestCase):
res = helper._get_cluster_hosts_information('host', self.cluster)
self.assertEqual(res, mock_generate())
@mock.patch('sahara.service.edp.hdfs_helper._is_cluster_configured')
@mock.patch('six.text_type')
@mock.patch('sahara.plugins.utils.get_instances')
@mock.patch(('sahara.service.edp.hdfs_helper._get_cluster_hosts_'
'information'))
def test_configure_cluster_for_hdfs(self, mock_helper, mock_get, mock_six):
def test_configure_cluster_for_hdfs(self, mock_helper, mock_get, mock_six,
cluster_conf):
cluster_conf.return_value = False
inst = mock.MagicMock()
inst.remote = mock.MagicMock()
mock_six.return_value = 111
@ -139,6 +142,29 @@ class HDFSHelperTestCase(base.SaharaTestCase):
mock.call().__enter__().execute_command(str2, run_as_root=True),
mock.call().__exit__(None, None, None)])
@mock.patch('sahara.plugins.utils.get_instances')
def test_is_cluster_configured(self, mock_get):
inst = mock.Mock()
r = mock.MagicMock()
inst.remote = mock.Mock(return_value=r)
enter_r = mock.Mock()
enter_r.execute_command = mock.Mock()
enter_r.execute_command.return_value = 0, "127.0.0.1 localhost\n" + \
"127.0.0.2 t1 t1"
r.__enter__.return_value = enter_r
cmd = 'cat /etc/hosts'
host_info = ['127.0.0.1 localhost', '127.0.0.2 t1 t1']
mock_get.return_value = [inst]
res = helper._is_cluster_configured(self.cluster, host_info)
self.assertTrue(res)
enter_r.execute_command.assert_called_with(cmd)
enter_r.execute_command.return_value = 0, "127.0.0.1 localhost\n"
res = helper._is_cluster_configured(self.cluster, host_info)
self.assertFalse(res)
enter_r.execute_command.assert_called_with(cmd)
@mock.patch('six.text_type')
@mock.patch('os.open')
def test_put_file_to_hdfs(self, open_get, mock_six):

View File

@ -25,6 +25,7 @@ from sahara.plugins import base as pb
from sahara.service.castellan import config as castellan
from sahara.service.edp import job_manager
from sahara.service.edp import job_utils
from sahara.service.edp.job_utils import ds_manager
from sahara.service.edp.oozie.workflow_creator import workflow_factory
from sahara.swift import swift_helper as sw
from sahara.swift import utils as su
@ -47,6 +48,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
p.patch_minidom_writexml()
pb.setup_plugins()
castellan.validate_config()
ds_manager.setup_data_sources()
@mock.patch('uuid.uuid4')
@mock.patch('sahara.utils.remote.get_remote')
@ -562,9 +564,9 @@ class TestJobManager(base.SaharaWithDbTestCase):
job_manager._run_job(job_exec.id)
@mock.patch('sahara.conductor.API.data_source_get')
def test_get_data_sources(self, ds):
def test_get_input_output_data_sources(self, ds):
def _conductor_data_source_get(ctx, id):
return mock.Mock(id=id, url="obj_" + id)
return mock.Mock(id=id, url="hdfs://obj_" + id, type='hdfs')
job, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG)
@ -573,18 +575,18 @@ class TestJobManager(base.SaharaWithDbTestCase):
ds.side_effect = _conductor_data_source_get
input_source, output_source = (
job_utils.get_data_sources(job_exec, job, {}))
job_utils.get_input_output_data_sources(job_exec, job, {}))
self.assertEqual('obj_s1', input_source.url)
self.assertEqual('obj_s2', output_source.url)
self.assertEqual('hdfs://obj_s1', input_source.url)
self.assertEqual('hdfs://obj_s2', output_source.url)
def test_get_data_sources_with_null_id(self):
def test_get_input_output_data_sources_with_null_id(self):
configs = {sw.HADOOP_SWIFT_USERNAME: 'admin',
sw.HADOOP_SWIFT_PASSWORD: 'admin1'}
configs = {
'configs': configs,
'args': ['swift://ex/i',
'args': ['hdfs://ex/i',
'output_path']
}
@ -594,7 +596,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
job_exec.output_id = None
input_source, output_source = (
job_utils.get_data_sources(job_exec, job, {}))
job_utils.get_input_output_data_sources(job_exec, job, {}))
self.assertIsNone(input_source)
self.assertIsNone(output_source)

View File

@ -19,6 +19,7 @@ from oslo_utils import uuidutils
import testtools
from sahara import conductor as cond
from sahara.service.edp.data_sources import manager as ds_manager
from sahara.service.edp import job_utils
from sahara.tests.unit.service.edp import edp_test_utils as u
@ -29,6 +30,7 @@ class JobUtilsTestCase(testtools.TestCase):
def setUp(self):
super(JobUtilsTestCase, self).setUp()
ds_manager.setup_data_sources()
def test_args_may_contain_data_sources(self):
job_configs = None
@ -205,39 +207,6 @@ class JobUtilsTestCase(testtools.TestCase):
job_exec_id, urls)
self.assertEqual(2, len(ds))
self.assertEqual([input.url, output_url, input.url], nc['args'])
# Swift configs should be filled in since they were blank
self.assertEqual(input.credentials['user'],
nc['configs']['fs.swift.service.sahara.username'])
self.assertEqual(input.credentials['password'],
nc['configs']['fs.swift.service.sahara.password'])
self.assertEqual(2, len(urls))
self.assertItemsEqual({input.id: (input_url, input_url),
output.id: (output_url, output_url)}, urls)
job_configs['configs'] = {'fs.swift.service.sahara.username': 'sam',
'fs.swift.service.sahara.password': 'gamgee',
job_utils.DATA_SOURCE_SUBST_NAME: False,
job_utils.DATA_SOURCE_SUBST_UUID: True}
ds, nc = job_utils.resolve_data_source_references(job_configs,
job_exec_id, {})
self.assertEqual(2, len(ds))
self.assertEqual([name_ref, output_url, input.url], nc['args'])
# Swift configs should not be overwritten
self.assertEqual(job_configs['configs'], nc['configs'])
job_configs['configs'] = {job_utils.DATA_SOURCE_SUBST_NAME: True,
job_utils.DATA_SOURCE_SUBST_UUID: False}
job_configs['proxy_configs'] = {'proxy_username': 'john',
'proxy_password': 'smith',
'proxy_trust_id': 'trustme'}
ds, nc = job_utils.resolve_data_source_references(job_configs,
job_exec_id, {})
self.assertEqual(1, len(ds))
self.assertEqual([input.url, output.id, input.id], nc['args'])
# Swift configs should be empty and proxy configs should be preserved
self.assertEqual(job_configs['configs'], nc['configs'])
self.assertEqual(job_configs['proxy_configs'], nc['proxy_configs'])
# Substitution not enabled
job_configs['configs'] = {job_utils.DATA_SOURCE_SUBST_NAME: False,
@ -270,28 +239,20 @@ class JobUtilsTestCase(testtools.TestCase):
job_utils.to_url_dict(data_source_urls,
runtime=True))
def test_construct_data_source_url_no_placeholders(self):
base_url = "swift://container/input"
job_exec_id = uuidutils.generate_uuid()
@mock.patch('sahara.service.edp.hdfs_helper.configure_cluster_for_hdfs')
def test_prepare_cluster_for_ds(self, configure):
data_source_urls = {'1': '1_runtime',
'2': '2_runtime'}
url = job_utils._construct_data_source_url(base_url, job_exec_id)
data_source = mock.Mock()
data_source.type = 'hdfs'
data_source.id = '1'
self.assertEqual(base_url, url)
cluster = mock.Mock()
job_configs = mock.Mock()
def test_construct_data_source_url_job_exec_id_placeholder(self):
base_url = "swift://container/input.%JOB_EXEC_ID%.out"
job_exec_id = uuidutils.generate_uuid()
job_utils.prepare_cluster_for_ds([data_source], cluster, job_configs,
data_source_urls)
url = job_utils._construct_data_source_url(base_url, job_exec_id)
self.assertEqual(
"swift://container/input." + job_exec_id + ".out", url)
def test_construct_data_source_url_randstr_placeholder(self):
base_url = "swift://container/input.%RANDSTR(4)%.%RANDSTR(7)%.out"
job_exec_id = uuidutils.generate_uuid()
url = job_utils._construct_data_source_url(base_url, job_exec_id)
self.assertRegex(
url, "swift://container/input\.[a-z]{4}\.[a-z]{7}\.out")
configure.assert_called_once()
configure.assert_called_with(cluster, '1_runtime')

View File

@ -23,7 +23,7 @@ from oslo_utils import uuidutils
import testtools
from sahara import exceptions
from sahara.service import shares
from sahara.service.edp.utils import shares
from sahara.tests.unit import base
_NAMENODE_IPS = ['192.168.122.3', '192.168.122.4']

View File

@ -19,330 +19,17 @@ from oslo_utils import uuidutils
import testtools
import sahara.exceptions as ex
from sahara.service.api import v10 as api
import sahara.service.edp.data_sources.manager as ds_manager
from sahara.service.validations.edp import data_source as ds
from sahara.service.validations.edp import data_source_schema as ds_schema
from sahara.swift import utils as su
from sahara.tests.unit.service.validation import utils as u
SAMPLE_SWIFT_URL = "swift://1234/object"
SAMPLE_SWIFT_URL_WITH_SUFFIX = "swift://1234%s/object" % su.SWIFT_URL_SUFFIX
class TestDataSourceCreateValidation(u.ValidationTestCase):
def setUp(self):
super(TestDataSourceCreateValidation, self).setUp()
self._create_object_fun = ds.check_data_source_create
self.scheme = ds_schema.DATA_SOURCE_SCHEMA
api.plugin_base.setup_plugins()
def test_swift_creation(self):
data = {
"name": "test_data_data_source",
"url": SAMPLE_SWIFT_URL,
"type": "swift",
"credentials": {
"user": "user",
"password": "password"
},
"description": "long description"
}
self._assert_types(data)
@mock.patch("sahara.service.validations."
"edp.base.check_data_source_unique_name")
def test_swift_creation_missing_credentials(self,
check_data_source_unique_name):
check_data_source_unique_name.return_value = True
data = {
"name": "test_data_data_source",
"url": SAMPLE_SWIFT_URL,
"type": "swift",
"description": "long description"
}
with testtools.ExpectedException(ex.InvalidCredentials):
ds.check_data_source_create(data)
# proxy enabled should allow creation without credentials
self.override_config('use_domain_for_proxy_users', True)
ds.check_data_source_create(data)
@mock.patch("sahara.service.validations."
"edp.base.check_data_source_unique_name")
def test_swift_creation_credentials_missing_user(
self,
check_data_source_unique_name):
check_data_source_unique_name.return_value = True
data = {
"name": "test_data_data_source",
"url": SAMPLE_SWIFT_URL,
"type": "swift",
"credentials": {
"password": "password"
},
"description": "long description"
}
with testtools.ExpectedException(ex.InvalidCredentials):
ds.check_data_source_create(data)
# proxy enabled should allow creation without credentials
self.override_config('use_domain_for_proxy_users', True)
ds.check_data_source_create(data)
@mock.patch("sahara.service.validations."
"edp.base.check_data_source_unique_name")
def test_swift_creation_credentials_missing_password(
self,
check_data_source_unique_name):
check_data_source_unique_name.return_value = True
data = {
"name": "test_data_data_source",
"url": SAMPLE_SWIFT_URL,
"type": "swift",
"credentials": {
"user": "user",
},
"description": "long description"
}
with testtools.ExpectedException(ex.InvalidCredentials):
ds.check_data_source_create(data)
# proxy enabled should allow creation without credentials
self.override_config('use_domain_for_proxy_users', True)
ds.check_data_source_create(data)
@mock.patch("sahara.service.validations."
"edp.base.check_data_source_unique_name")
def test_swift_creation_wrong_schema(self, check_data_source_unique_name):
check_data_source_unique_name.return_value = True
data = {
"name": "test_data_data_source",
"url": "swif://1234/object",
"type": "swift",
"description": "incorrect url schema"
}
with testtools.ExpectedException(ex.InvalidDataException):
ds.check_data_source_create(data)
@mock.patch("sahara.service.validations."
"edp.base.check_data_source_unique_name")
def test_swift_creation_explicit_suffix(self,
check_data_source_unique_name):
check_data_source_unique_name.return_value = True
data = {
"name": "test_data_data_source",
"url": SAMPLE_SWIFT_URL_WITH_SUFFIX,
"type": "swift",
"description": "incorrect url schema",
"credentials": {
"user": "user",
"password": "password"
}
}
self._assert_types(data)
@mock.patch("sahara.service.validations."
"edp.base.check_data_source_unique_name")
def test_swift_creation_wrong_suffix(self,
check_data_source_unique_name):
check_data_source_unique_name.return_value = True
data = {
"name": "test_data_data_source",
"url": "swift://1234.suffix/object",
"type": "swift",
"description": "incorrect url schema"
}
with testtools.ExpectedException(ex.InvalidDataException):
ds.check_data_source_create(data)
@mock.patch("sahara.service.validations."
"edp.base.check_data_source_unique_name")
def test_swift_creation_missing_object(self,
check_data_source_unique_name):
check_data_source_unique_name.return_value = True
data = {
"name": "test_data_data_source",
"url": "swift://1234/",
"type": "swift",
"description": "incorrect url schema"
}
with testtools.ExpectedException(ex.InvalidDataException):
ds.check_data_source_create(data)
@mock.patch("sahara.service.validations."
"edp.base.check_data_source_unique_name")
def test_hdfs_creation_wrong_schema(self, check_data_source_unique_name):
check_data_source_unique_name.return_value = True
data = {
"name": "test_data_data_source",
"url": "hdf://test_cluster/",
"type": "hdfs",
"description": "incorrect url schema"
}
with testtools.ExpectedException(ex.InvalidDataException):
ds.check_data_source_create(data)
@mock.patch("sahara.service.validations."
"edp.base.check_data_source_unique_name")
def test_hdfs_creation_correct_url(self, check_data_source_unique_name):
check_data_source_unique_name.return_value = True
data = {
"name": "test_data_data_source",
"url": "hdfs://test_cluster/",
"type": "hdfs",
"description": "correct url schema"
}
ds.check_data_source_create(data)
@mock.patch("sahara.service.validations."
"edp.base.check_data_source_unique_name")
def test_hdfs_creation_local_rel_url(self, check_data_source_unique_name):
check_data_source_unique_name.return_value = True
data = {
"name": "test_data_data_source",
"url": "mydata/input",
"type": "hdfs",
"description": "correct url schema for relative path on local hdfs"
}
ds.check_data_source_create(data)
@mock.patch("sahara.service.validations."
"edp.base.check_data_source_unique_name")
def test_hdfs_creation_local_abs_url(self, check_data_source_unique_name):
check_data_source_unique_name.return_value = True
data = {
"name": "test_data_data_source",
"url": "/tmp/output",
"type": "hdfs",
"description": "correct url schema for absolute path on local hdfs"
}
ds.check_data_source_create(data)
@mock.patch("sahara.service.validations."
"edp.base.check_data_source_unique_name")
def test_maprfs_creation_wrong_schema(self, check_data_source_unique_name):
check_data_source_unique_name.return_value = True
data = {
"name": "test_data_data_source",
"url": "maprf://test_cluster/",
"type": "maprfs",
"description": "incorrect url schema"
}
with testtools.ExpectedException(ex.InvalidDataException):
ds.check_data_source_create(data)
@mock.patch("sahara.service.validations."
"edp.base.check_data_source_unique_name")
def test_maprfs_creation_correct_url(self, check_data_source_unique_name):
check_data_source_unique_name.return_value = True
data = {
"name": "test_data_data_source",
"url": "maprfs:///test_cluster/",
"type": "maprfs",
"description": "correct url schema"
}
ds.check_data_source_create(data)
@mock.patch("sahara.service.validations."
"edp.base.check_data_source_unique_name")
def test_maprfs_creation_local_rel_url(self, check_ds_unique_name):
check_ds_unique_name.return_value = True
data = {
"name": "test_data_data_source",
"url": "mydata/input",
"type": "maprfs",
"description": ("correct url schema for"
" relative path on local maprfs")
}
ds.check_data_source_create(data)
@mock.patch("sahara.service.validations."
"edp.base.check_data_source_unique_name")
def test_maprfs_creation_local_abs_url(self, check_ds_unique_name):
check_ds_unique_name.return_value = True
data = {
"name": "test_data_data_source",
"url": "/tmp/output",
"type": "maprfs",
"description": ("correct url schema for"
" absolute path on local maprfs")
}
ds.check_data_source_create(data)
@mock.patch("sahara.service.validations."
"edp.base.check_data_source_unique_name")
def test_manila_creation_wrong_schema(self, check_ds_unique_name):
check_ds_unique_name.return_value = True
data = {
"name": "test_data_data_source",
"url": "man://%s" % uuidutils.generate_uuid(),
"type": "manila",
"description": ("incorrect url schema for")
}
with testtools.ExpectedException(ex.InvalidDataException):
ds.check_data_source_create(data)
@mock.patch("sahara.service.validations."
"edp.base.check_data_source_unique_name")
def test_manila_creation_empty_url(self, check_ds_unique_name):
check_ds_unique_name.return_value = True
data = {
"name": "test_data_data_source",
"url": "",
"type": "manila",
"description": ("empty url")
}
with testtools.ExpectedException(ex.InvalidDataException):
ds.check_data_source_create(data)
@mock.patch("sahara.service.validations."
"edp.base.check_data_source_unique_name")
def test_manila_creation_no_uuid(self, check_ds_unique_name):
check_ds_unique_name.return_value = True
data = {
"name": "test_data_data_source",
"url": "manila://bob",
"type": "manila",
"description": ("netloc is not a uuid")
}
with testtools.ExpectedException(ex.InvalidDataException):
ds.check_data_source_create(data)
@mock.patch("sahara.service.validations."
"edp.base.check_data_source_unique_name")
def test_manila_creation_no_path(self, check_ds_unique_name):
check_ds_unique_name.return_value = True
data = {
"name": "test_data_data_source",
"url": "manila://%s" % uuidutils.generate_uuid(),
"type": "manila",
"description": ("netloc is not a uuid")
}
with testtools.ExpectedException(ex.InvalidDataException):
ds.check_data_source_create(data)
@mock.patch("sahara.service.validations."
"edp.base.check_data_source_unique_name")
def test_manila_correct(self, check_ds_unique_name):
check_ds_unique_name.return_value = True
data = {
"name": "test_data_data_source",
"url": "manila://%s/foo" % uuidutils.generate_uuid(),
"type": "manila",
"description": ("correct url")
}
self._assert_types(data)
class TestDataSourceUpdateValidation(u.ValidationTestCase):
def setUp(self):
super(TestDataSourceUpdateValidation, self).setUp()
ds_manager.setup_data_sources()
def _update_swift(self):
with testtools.ExpectedException(ex.InvalidDataException):
ds.check_data_source_update({'url': 'swift://cont/obj'}, 'ds_id')

View File

@ -13,8 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from sahara.service.api import v10 as api
from sahara.service.validations.edp import job_binary as b
from sahara.service.validations.edp.job_binary import jb_manager
from sahara.service.validations.edp import job_binary_schema as b_s
from sahara.swift import utils as su
from sahara.tests.unit.service.validation import utils as u
@ -26,8 +29,18 @@ class TestJobBinaryValidation(u.ValidationTestCase):
self._create_object_fun = b.check_job_binary
self.scheme = b_s.JOB_BINARY_SCHEMA
api.plugin_base.setup_plugins()
jb_manager.setup_job_binaries()
@mock.patch('sahara.utils.api_validator.jb_manager')
def test_creation(self, mock_jb_manager):
JOB_BINARIES = mock.Mock()
mock_jb = mock.Mock()
mock_jb_manager.JOB_BINARIES = JOB_BINARIES
JOB_BINARIES.get_job_binary_by_url = mock.Mock(return_value=mock_jb)
mock_jb.validate_job_location_format = mock.Mock(return_value=True)
def test_creation(self):
data = {
"name": "main.jar",
"url": "internal-db://3e4651a5-1f08-4880-94c4-596372b37c64",
@ -39,7 +52,15 @@ class TestJobBinaryValidation(u.ValidationTestCase):
}
self._assert_types(data)
def test_job_binary_create_swift(self):
@mock.patch('sahara.utils.api_validator.jb_manager')
def test_job_binary_create_swift(self, mock_jb_manager):
JOB_BINARIES = mock.Mock()
mock_jb = mock.Mock()
mock_jb_manager.JOB_BINARIES = JOB_BINARIES
JOB_BINARIES.get_job_binary_by_url = mock.Mock(return_value=mock_jb)
mock_jb.validate_job_location_format = mock.Mock(return_value=True)
self._assert_create_object_validation(
data={
"name": "j_o_w",
@ -55,7 +76,15 @@ class TestJobBinaryValidation(u.ValidationTestCase):
"url": su.SWIFT_INTERNAL_PREFIX + "o.sahara/k"
})
def test_job_binary_create_internal(self):
@mock.patch('sahara.utils.api_validator.jb_manager')
def test_job_binary_create_internal(self, mock_jb_manager):
JOB_BINARIES = mock.Mock()
mock_jb = mock.Mock()
mock_jb_manager.JOB_BINARIES = JOB_BINARIES
JOB_BINARIES.get_job_binary_by_url = mock.Mock(return_value=mock_jb)
mock_jb.validate_job_location_format = mock.Mock(return_value=False)
self._assert_create_object_validation(
data={
"name": "main.jar",
@ -64,3 +93,21 @@ class TestJobBinaryValidation(u.ValidationTestCase):
bad_req_i=(1, "VALIDATION_ERROR",
"url: 'internal-db://abacaba' is not a "
"'valid_job_location'"))
@mock.patch('sahara.utils.api_validator.jb_manager')
def test_job_binary_create_manila(self, mock_jb_manager):
JOB_BINARIES = mock.Mock()
mock_jb = mock.Mock()
mock_jb_manager.JOB_BINARIES = JOB_BINARIES
JOB_BINARIES.get_job_binary_by_url = mock.Mock(return_value=mock_jb)
mock_jb.validate_job_location_format = mock.Mock(return_value=False)
self._assert_create_object_validation(
data={
"name": "main.jar",
"url": "manila://abacaba",
},
bad_req_i=(1, "VALIDATION_ERROR",
"url: 'manila://abacaba' is not a "
"'valid_job_location'"))

View File

@ -17,6 +17,7 @@ import mock
from sahara.service.api import v10 as api
from sahara.service.validations.edp import job_binary_internal as jb
from sahara.service.validations.edp.job_binary_internal import jb_manager
from sahara.service.validations.edp import job_binary_internal_schema as jbs
from sahara.tests.unit.service.validation import utils as u
@ -26,6 +27,7 @@ class TestJobBinaryInternalCreateValidation(u.ValidationTestCase):
super(TestJobBinaryInternalCreateValidation, self).setUp()
self._create_object_fun = jb.check_job_binary_internal
api.plugin_base.setup_plugins()
jb_manager.setup_job_binaries()
def test_job_binary_internal_create(self):
self._assert_create_object_validation(data='text')

View File

@ -18,10 +18,8 @@ import re
import jsonschema
from oslo_utils import uuidutils
import six
import six.moves.urllib.parse as urlparse
from sahara.swift import utils as su
from sahara.utils.openstack import manila as m
from sahara.service.edp.job_binaries import manager as jb_manager
@jsonschema.FormatChecker.cls_checks('valid_name_hostname')
@ -63,19 +61,9 @@ def validate_job_location_format(entry):
# should fail type validation
return True
if entry.startswith('internal-db://'):
return uuidutils.is_uuid_like(entry[len("internal-db://"):])
if entry.startswith(su.SWIFT_INTERNAL_PREFIX):
# TODO(nprivalova):add hostname validation
return True
if entry.startswith(m.MANILA_PREFIX):
url = urlparse.urlparse(entry)
if (uuidutils.is_uuid_like(url.netloc) and
len(url.path) > 1):
return True
return False
return jb_manager.JOB_BINARIES \
.get_job_binary_by_url(entry) \
.validate_job_location_format(entry)
@jsonschema.FormatChecker.cls_checks('valid_tag')