Updating JobBinaries to use proxy for Swift access
Changes * refactoring get_raw_binary to accept proxy configs * refactoring get_raw_data to use proxy Swift connection when necessary * adding function to get a Swift Connection object from proxy user * refactoring upload_job_files_to_hdfs and upload_job_files to use proxy user when necessary * changing JobBinary JSON schema to allow blank username/password if proxy domains are being used * adding function to get the Swift public endpoint for the current project * adding test for JobBinary creation without credentials Partial-implements: blueprint edp-swift-trust-authentication Change-Id: I02e76016194fbbb62b8ab7b304eecc53d580a79c
This commit is contained in:
parent
c332e4f96d
commit
f3b2a30309
@ -19,12 +19,12 @@ from sahara.service.edp.binary_retrievers import sahara_db as db
|
||||
from sahara.swift import utils as su
|
||||
|
||||
|
||||
def get_raw_binary(job_binary):
|
||||
def get_raw_binary(job_binary, proxy_configs=None):
|
||||
url = job_binary.url
|
||||
if url.startswith("internal-db://"):
|
||||
res = db.get_raw_data(context.ctx(), job_binary)
|
||||
|
||||
if url.startswith(su.SWIFT_INTERNAL_PREFIX):
|
||||
res = i_swift.get_raw_data(context.ctx(), job_binary)
|
||||
res = i_swift.get_raw_data(context.ctx(), job_binary, proxy_configs)
|
||||
|
||||
return res
|
||||
|
@ -21,6 +21,7 @@ import sahara.exceptions as ex
|
||||
from sahara.i18n import _
|
||||
from sahara.swift import swift_helper
|
||||
from sahara.swift import utils as su
|
||||
from sahara.utils.openstack import keystone as k
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -34,18 +35,30 @@ def _get_conn(user, password):
|
||||
auth_version="2.0")
|
||||
|
||||
|
||||
def _get_conn_for_proxy_user(configs):
|
||||
preauthurl = su.retrieve_preauth_url()
|
||||
proxyclient = k.client_for_proxy_user(configs['proxy_username'],
|
||||
configs['proxy_password'],
|
||||
configs['proxy_trust_id'])
|
||||
return swiftclient.Connection(preauthurl=preauthurl,
|
||||
preauthtoken=proxyclient.auth_token,
|
||||
auth_version='2.0')
|
||||
|
||||
|
||||
def _strip_sahara_suffix(container_name):
|
||||
if container_name.endswith(su.SWIFT_URL_SUFFIX):
|
||||
container_name = container_name[:-len(su.SWIFT_URL_SUFFIX)]
|
||||
return container_name
|
||||
|
||||
|
||||
def get_raw_data(context, job_binary):
|
||||
def get_raw_data(context, job_binary, proxy_configs=None):
|
||||
if proxy_configs:
|
||||
conn = _get_conn_for_proxy_user(proxy_configs)
|
||||
else:
|
||||
user = job_binary.extra["user"]
|
||||
password = job_binary.extra["password"]
|
||||
|
||||
user = job_binary.extra["user"]
|
||||
password = job_binary.extra["password"]
|
||||
|
||||
conn = _get_conn(user, password)
|
||||
conn = _get_conn(user, password)
|
||||
|
||||
if not (job_binary.url.startswith(su.SWIFT_INTERNAL_PREFIX)):
|
||||
# This should have been guaranteed already,
|
||||
|
@ -44,14 +44,15 @@ def get_plugin(cluster):
|
||||
return plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
|
||||
|
||||
|
||||
def upload_job_files(where, job_dir, job, libs_subdir=True):
|
||||
def upload_job_files(where, job_dir, job, libs_subdir=True,
|
||||
proxy_configs=None):
|
||||
mains = job.mains or []
|
||||
libs = job.libs or []
|
||||
uploaded_paths = []
|
||||
|
||||
def upload(r, dir, job_file):
|
||||
dst = os.path.join(dir, job_file.name)
|
||||
raw_data = dispatch.get_raw_binary(job_file)
|
||||
raw_data = dispatch.get_raw_binary(job_file, proxy_configs)
|
||||
r.write_file_to(dst, raw_data)
|
||||
uploaded_paths.append(dst)
|
||||
|
||||
|
@ -80,6 +80,7 @@ class OozieJobEngine(base_engine.JobEngine):
|
||||
job = conductor.job_get(ctx, job_execution.job_id)
|
||||
input_source, output_source = job_utils.get_data_sources(job_execution,
|
||||
job)
|
||||
proxy_configs = job_execution.job_configs.get('proxy_configs')
|
||||
|
||||
for data_source in [input_source, output_source]:
|
||||
if data_source and data_source.type == 'hdfs':
|
||||
@ -93,7 +94,8 @@ class OozieJobEngine(base_engine.JobEngine):
|
||||
oozie_server = self.get_oozie_server(self.cluster)
|
||||
|
||||
wf_dir = self._create_hdfs_workflow_dir(oozie_server, job)
|
||||
self._upload_job_files_to_hdfs(oozie_server, wf_dir, job)
|
||||
self._upload_job_files_to_hdfs(oozie_server, wf_dir, job,
|
||||
proxy_configs)
|
||||
|
||||
wf_xml = workflow_factory.get_workflow_xml(
|
||||
job, self.cluster, job_execution, input_source, output_source,
|
||||
@ -152,7 +154,8 @@ class OozieJobEngine(base_engine.JobEngine):
|
||||
edp.JOB_TYPE_MAPREDUCE_STREAMING,
|
||||
edp.JOB_TYPE_PIG]
|
||||
|
||||
def _upload_job_files_to_hdfs(self, where, job_dir, job):
|
||||
def _upload_job_files_to_hdfs(self, where, job_dir, job,
|
||||
proxy_configs=None):
|
||||
mains = job.mains or []
|
||||
libs = job.libs or []
|
||||
uploaded_paths = []
|
||||
@ -160,11 +163,11 @@ class OozieJobEngine(base_engine.JobEngine):
|
||||
|
||||
with remote.get_remote(where) as r:
|
||||
for main in mains:
|
||||
raw_data = dispatch.get_raw_binary(main)
|
||||
raw_data = dispatch.get_raw_binary(main, proxy_configs)
|
||||
h.put_file_to_hdfs(r, raw_data, main.name, job_dir, hdfs_user)
|
||||
uploaded_paths.append(job_dir + '/' + main.name)
|
||||
for lib in libs:
|
||||
raw_data = dispatch.get_raw_binary(lib)
|
||||
raw_data = dispatch.get_raw_binary(lib, proxy_configs)
|
||||
# HDFS 2.2.0 fails to put file if the lib dir does not exist
|
||||
self.create_hdfs_dir(r, job_dir + "/lib")
|
||||
h.put_file_to_hdfs(r, raw_data, lib.name, job_dir + "/lib",
|
||||
|
@ -112,6 +112,8 @@ class SparkJobEngine(base_engine.JobEngine):
|
||||
ctx = context.ctx()
|
||||
job = conductor.job_get(ctx, job_execution.job_id)
|
||||
|
||||
proxy_configs = job_execution.job_configs.get('proxy_configs')
|
||||
|
||||
# We'll always run the driver program on the master
|
||||
master = plugin_utils.get_instance(self.cluster, "master")
|
||||
|
||||
@ -120,7 +122,8 @@ class SparkJobEngine(base_engine.JobEngine):
|
||||
wf_dir = job_utils.create_workflow_dir(master, '/tmp/spark-edp', job,
|
||||
job_execution.id)
|
||||
paths = job_utils.upload_job_files(master, wf_dir, job,
|
||||
libs_subdir=False)
|
||||
libs_subdir=False,
|
||||
proxy_configs=proxy_configs)
|
||||
|
||||
# We can shorten the paths in this case since we'll run out of wf_dir
|
||||
paths = [os.path.basename(p) for p in paths]
|
||||
|
@ -13,10 +13,13 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
import sahara.exceptions as e
|
||||
import sahara.service.validations.edp.base as b
|
||||
from sahara.swift import utils as su
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
JOB_BINARY_SCHEMA = {
|
||||
"type": "object",
|
||||
@ -52,7 +55,8 @@ def check_job_binary(data, **kwargs):
|
||||
job_binary_location_type = data["url"]
|
||||
extra = data.get("extra", {})
|
||||
if job_binary_location_type.startswith(su.SWIFT_INTERNAL_PREFIX):
|
||||
if not extra.get("user") or not extra.get("password"):
|
||||
if (not extra.get("user") or not extra.get("password")) and (
|
||||
not CONF.use_domain_for_proxy_users):
|
||||
raise e.BadJobBinaryException()
|
||||
if job_binary_location_type.startswith("internal-db"):
|
||||
internal_uid = job_binary_location_type[len("internal-db://"):]
|
||||
|
@ -17,6 +17,7 @@ from oslo.config import cfg
|
||||
from six.moves.urllib import parse as urlparse
|
||||
|
||||
from sahara import context
|
||||
from sahara.utils.openstack import keystone as k
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -34,3 +35,18 @@ def retrieve_auth_url():
|
||||
info = urlparse.urlparse(context.current().auth_uri)
|
||||
|
||||
return "%s://%s:%s/%s/" % (info.scheme, info.hostname, info.port, 'v2.0')
|
||||
|
||||
|
||||
def retrieve_preauth_url():
|
||||
'''This function returns the storage URL for Swift in the current project.
|
||||
|
||||
:returns: The storage URL for the current project's Swift store, or None
|
||||
if it can't be found.
|
||||
|
||||
'''
|
||||
client = k.client()
|
||||
catalog = client.service_catalog.get_endpoints('object-store')
|
||||
for ep in catalog.get('object-store'):
|
||||
if ep.get('interface') == 'public':
|
||||
return ep.get('url')
|
||||
return None
|
||||
|
@ -321,7 +321,7 @@ class TestSpark(base.SaharaTestCase):
|
||||
def test_run_job(self, ctx, job_get, get_instance, create_workflow_dir,
|
||||
upload_job_files, get_config_value, get_remote):
|
||||
|
||||
def fix_get(field, default):
|
||||
def fix_get(field, default=None):
|
||||
if field == "args":
|
||||
return ["input_arg", "output_arg"]
|
||||
return default
|
||||
|
@ -47,6 +47,12 @@ class TestJobBinaryValidation(u.ValidationTestCase):
|
||||
bad_req_i=(1, "BAD_JOB_BINARY",
|
||||
"To work with JobBinary located in internal "
|
||||
"swift add 'user' and 'password' to extra"))
|
||||
self.override_config('use_domain_for_proxy_users', True)
|
||||
self._assert_create_object_validation(
|
||||
data={
|
||||
"name": "j_o_w",
|
||||
"url": su.SWIFT_INTERNAL_PREFIX + "o.sahara/k"
|
||||
})
|
||||
|
||||
def test_job_binary_create_internal(self):
|
||||
self._assert_create_object_validation(
|
||||
|
Loading…
Reference in New Issue
Block a user