Allow job binaries to be retrieved from internal swift
* also adds job-binaries/<id>/data to REST api * limit the size of job binary data retrieved from swift or the internal db Fixes: bug #1228128 Change-Id: Ib0278d7aee334b628089b7108c5bd437845ceaee
This commit is contained in:
parent
12b470ab2b
commit
134e241e97
@ -27,6 +27,10 @@
|
|||||||
# Use Neutron or Nova Network (boolean value)
|
# Use Neutron or Nova Network (boolean value)
|
||||||
#use_neutron=false
|
#use_neutron=false
|
||||||
|
|
||||||
|
# The maximum length of job binary data in Kilobytes that
|
||||||
|
# may be stored or retrieved in a single operation
|
||||||
|
#job_binary_max_KB=5120
|
||||||
|
|
||||||
#
|
#
|
||||||
# Options defined in savanna.main
|
# Options defined in savanna.main
|
||||||
#
|
#
|
||||||
|
@ -187,3 +187,12 @@ def job_binary_internal_delete(job_binary_internal_id):
|
|||||||
@v.check_exists(api.get_job_binary_internal, 'job_binary_internal_id')
|
@v.check_exists(api.get_job_binary_internal, 'job_binary_internal_id')
|
||||||
def job_binary_internal_data(job_binary_internal_id):
|
def job_binary_internal_data(job_binary_internal_id):
|
||||||
return api.get_job_binary_internal_data(job_binary_internal_id)
|
return api.get_job_binary_internal_data(job_binary_internal_id)
|
||||||
|
|
||||||
|
|
||||||
|
@rest.get('/job-binaries/<job_binary_id>/data')
|
||||||
|
@v.check_exists(api.get_job_binary, 'job_binary_id')
|
||||||
|
def job_binary_data(job_binary_id):
|
||||||
|
data = api.get_job_binary_data(job_binary_id)
|
||||||
|
if type(data) == dict:
|
||||||
|
data = u.render(data)
|
||||||
|
return data
|
||||||
|
@ -371,7 +371,6 @@ class ConductorManager(db_base.Base):
|
|||||||
# This is nice, since data could be big...
|
# This is nice, since data could be big...
|
||||||
values = copy.deepcopy(values)
|
values = copy.deepcopy(values)
|
||||||
values['tenant_id'] = context.tenant_id
|
values['tenant_id'] = context.tenant_id
|
||||||
values['datasize'] = len(values["data"])
|
|
||||||
return self.db.job_binary_internal_create(context, values)
|
return self.db.job_binary_internal_create(context, values)
|
||||||
|
|
||||||
def job_binary_internal_destroy(self, context, job_binary_internal):
|
def job_binary_internal_destroy(self, context, job_binary_internal):
|
||||||
|
@ -28,6 +28,13 @@ cli_opts = [
|
|||||||
'headers and bodies')
|
'headers and bodies')
|
||||||
]
|
]
|
||||||
|
|
||||||
|
edp_opts = [
|
||||||
|
cfg.IntOpt('job_binary_max_KB',
|
||||||
|
default=5120,
|
||||||
|
help='Maximum length of job binary data in Kilobytes that '
|
||||||
|
'may be stored or retrieved in a single operation')
|
||||||
|
]
|
||||||
|
|
||||||
networking_opts = [
|
networking_opts = [
|
||||||
cfg.BoolOpt('use_floating_ips',
|
cfg.BoolOpt('use_floating_ips',
|
||||||
default=True,
|
default=True,
|
||||||
@ -65,6 +72,7 @@ cfg.set_defaults(log.log_opts, default_log_levels=[
|
|||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
CONF.register_cli_opts(cli_opts)
|
CONF.register_cli_opts(cli_opts)
|
||||||
CONF.register_opts(networking_opts)
|
CONF.register_opts(networking_opts)
|
||||||
|
CONF.register_opts(edp_opts)
|
||||||
|
|
||||||
ARGV = []
|
ARGV = []
|
||||||
|
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
|
from oslo.config import cfg
|
||||||
import sqlalchemy as sa
|
import sqlalchemy as sa
|
||||||
|
|
||||||
from savanna.db.sqlalchemy import models as m
|
from savanna.db.sqlalchemy import models as m
|
||||||
@ -31,6 +32,8 @@ LOG = logging.getLogger(__name__)
|
|||||||
get_engine = db_session.get_engine
|
get_engine = db_session.get_engine
|
||||||
get_session = db_session.get_session
|
get_session = db_session.get_session
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
|
||||||
|
|
||||||
def get_backend():
|
def get_backend():
|
||||||
"""The backend is this module itself."""
|
"""The backend is this module itself."""
|
||||||
@ -685,9 +688,17 @@ def job_binary_internal_get(context, job_binary_internal_id):
|
|||||||
def job_binary_internal_get_raw_data(context, job_binary_internal_id):
|
def job_binary_internal_get_raw_data(context, job_binary_internal_id):
|
||||||
"""Returns only the data field for the specified JobBinaryInternal."""
|
"""Returns only the data field for the specified JobBinaryInternal."""
|
||||||
query = model_query(m.JobBinaryInternal, context)
|
query = model_query(m.JobBinaryInternal, context)
|
||||||
query = query.options(sa.orm.undefer("data"))
|
|
||||||
res = query.filter_by(id=job_binary_internal_id).first()
|
res = query.filter_by(id=job_binary_internal_id).first()
|
||||||
|
|
||||||
if res is not None:
|
if res is not None:
|
||||||
|
datasize_KB = res.datasize / 1024.0
|
||||||
|
if datasize_KB > CONF.job_binary_max_KB:
|
||||||
|
raise ex.DataTooBigException(round(datasize_KB, 1),
|
||||||
|
CONF.job_binary_max_KB,
|
||||||
|
"Size of internal binary (%sKB) is "
|
||||||
|
"greater than the maximum (%sKB)")
|
||||||
|
|
||||||
|
# This assignment is sufficient to load the deferred column
|
||||||
res = res.data
|
res = res.data
|
||||||
return res
|
return res
|
||||||
|
|
||||||
@ -697,6 +708,14 @@ def job_binary_internal_create(context, values):
|
|||||||
|
|
||||||
The data column uses deferred loading.
|
The data column uses deferred loading.
|
||||||
"""
|
"""
|
||||||
|
values["datasize"] = len(values["data"])
|
||||||
|
datasize_KB = values["datasize"] / 1024.0
|
||||||
|
if datasize_KB > CONF.job_binary_max_KB:
|
||||||
|
raise ex.DataTooBigException(round(datasize_KB, 1),
|
||||||
|
CONF.job_binary_max_KB,
|
||||||
|
"Size of internal binary (%sKB) is "
|
||||||
|
"greater than the maximum (%sKB)")
|
||||||
|
|
||||||
job_binary_int = m.JobBinaryInternal()
|
job_binary_int = m.JobBinaryInternal()
|
||||||
job_binary_int.update(values)
|
job_binary_int.update(values)
|
||||||
|
|
||||||
|
@ -122,7 +122,9 @@ class BadJobBinaryException(SavannaException):
|
|||||||
message = "To work with JobBinary located in internal swift add 'user'" \
|
message = "To work with JobBinary located in internal swift add 'user'" \
|
||||||
" and 'password' to extra"
|
" and 'password' to extra"
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self, message=None):
|
||||||
|
if message:
|
||||||
|
self.message = message
|
||||||
self.code = "BAD_JOB_BINARY"
|
self.code = "BAD_JOB_BINARY"
|
||||||
|
|
||||||
|
|
||||||
@ -149,3 +151,25 @@ class MissingFloatingNetworkException(SavannaException):
|
|||||||
self.message = ("Node Group %s is missing 'floating_ip_pool' "
|
self.message = ("Node Group %s is missing 'floating_ip_pool' "
|
||||||
"field" % ng_name)
|
"field" % ng_name)
|
||||||
self.code = "MISSING_FLOATING_NETWORK"
|
self.code = "MISSING_FLOATING_NETWORK"
|
||||||
|
|
||||||
|
|
||||||
|
class SwiftClientException(SavannaException):
|
||||||
|
'''General wrapper object for swift client exceptions
|
||||||
|
|
||||||
|
This exception is intended for wrapping the message from a
|
||||||
|
swiftclient.ClientException in a SavannaException. The ClientException
|
||||||
|
should be caught and an instance of SwiftClientException raised instead.
|
||||||
|
'''
|
||||||
|
def __init__(self, message):
|
||||||
|
self.message = message
|
||||||
|
self.code = "SWIFT_CLIENT_EXCEPTION"
|
||||||
|
|
||||||
|
|
||||||
|
class DataTooBigException(SavannaException):
|
||||||
|
message = "Size of data (%s) is greater than maximum (%s)"
|
||||||
|
|
||||||
|
def __init__(self, size, maximum, message=None):
|
||||||
|
if message:
|
||||||
|
self.message = message
|
||||||
|
self.message = self.message % (size, maximum)
|
||||||
|
self.code = "DATA_TOO_BIG"
|
||||||
|
@ -18,6 +18,7 @@ from savanna import conductor as c
|
|||||||
from savanna import context
|
from savanna import context
|
||||||
from savanna.openstack.common import log as logging
|
from savanna.openstack.common import log as logging
|
||||||
|
|
||||||
|
from savanna.service.edp.binary_retrievers import dispatch
|
||||||
from savanna.service.edp import job_manager as manager
|
from savanna.service.edp import job_manager as manager
|
||||||
from savanna.service.edp.workflow_creator import workflow_factory as w_f
|
from savanna.service.edp.workflow_creator import workflow_factory as w_f
|
||||||
|
|
||||||
@ -124,3 +125,8 @@ def delete_job_binary_internal(id):
|
|||||||
|
|
||||||
def get_job_binary_internal_data(id):
|
def get_job_binary_internal_data(id):
|
||||||
return conductor.job_binary_internal_get_raw_data(context.ctx(), id)
|
return conductor.job_binary_internal_get_raw_data(context.ctx(), id)
|
||||||
|
|
||||||
|
|
||||||
|
def get_job_binary_data(id):
|
||||||
|
job_binary = conductor.job_binary_get(context.ctx(), id)
|
||||||
|
return dispatch.get_raw_binary(job_binary)
|
||||||
|
@ -16,11 +16,15 @@
|
|||||||
from savanna import context
|
from savanna import context
|
||||||
from savanna.service.edp.binary_retrievers import internal_swift as i_swift
|
from savanna.service.edp.binary_retrievers import internal_swift as i_swift
|
||||||
from savanna.service.edp.binary_retrievers import savanna_db as db
|
from savanna.service.edp.binary_retrievers import savanna_db as db
|
||||||
|
from savanna.swift import utils as su
|
||||||
|
|
||||||
|
|
||||||
def get_raw_binary(job_binary):
|
def get_raw_binary(job_binary):
|
||||||
url = job_binary.url
|
url = job_binary.url
|
||||||
if url.startswith("savanna-db://"):
|
if url.startswith("savanna-db://"):
|
||||||
return db.get_raw_data(context.ctx(), job_binary)
|
res = db.get_raw_data(context.ctx(), job_binary)
|
||||||
if url.startswith("internal-swift://"):
|
|
||||||
return i_swift.get_raw_data(context.ctx(), job_binary)
|
if url.startswith(su.SWIFT_INTERNAL_PREFIX):
|
||||||
|
res = i_swift.get_raw_data(context.ctx(), job_binary)
|
||||||
|
|
||||||
|
return res
|
||||||
|
@ -13,6 +13,75 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
from oslo.config import cfg
|
||||||
|
import swiftclient
|
||||||
|
|
||||||
|
import savanna.exceptions as ex
|
||||||
|
from savanna.swift import utils as su
|
||||||
|
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
|
||||||
|
|
||||||
|
def _get_conn(user, password):
|
||||||
|
return swiftclient.Connection(su.retrieve_auth_url(append_tokens=False),
|
||||||
|
user,
|
||||||
|
password,
|
||||||
|
tenant_name=CONF.os_admin_tenant_name,
|
||||||
|
auth_version="2.0")
|
||||||
|
|
||||||
|
|
||||||
def get_raw_data(context, job_binary):
|
def get_raw_data(context, job_binary):
|
||||||
pass
|
|
||||||
|
user = job_binary.extra["user"]
|
||||||
|
password = job_binary.extra["password"]
|
||||||
|
|
||||||
|
conn = _get_conn(user, password)
|
||||||
|
|
||||||
|
if not job_binary.url.startswith(su.SWIFT_INTERNAL_PREFIX):
|
||||||
|
# This should have been guaranteed already,
|
||||||
|
# but we'll check just in case.
|
||||||
|
raise ex.BadJobBinaryException("Url for binary in internal swift "
|
||||||
|
"must start with %s"
|
||||||
|
% su.SWIFT_INTERNAL_PREFIX)
|
||||||
|
|
||||||
|
names = job_binary.url[len(su.SWIFT_INTERNAL_PREFIX):].split("/", 1)
|
||||||
|
if len(names) == 1:
|
||||||
|
# We are getting a whole container, return as a dictionary.
|
||||||
|
container = names[0]
|
||||||
|
|
||||||
|
# First check the size...
|
||||||
|
try:
|
||||||
|
headers = conn.head_container(container)
|
||||||
|
total_KB = int(headers.get('x-container-bytes-used', 0)) / 1024.0
|
||||||
|
if total_KB > CONF.job_binary_max_KB:
|
||||||
|
raise ex.DataTooBigException(round(total_KB, 1),
|
||||||
|
CONF.job_binary_max_KB,
|
||||||
|
"Size of swift container (%sKB) "
|
||||||
|
"is greater than maximum (%sKB)")
|
||||||
|
|
||||||
|
body = {}
|
||||||
|
headers, objects = conn.get_container(names[0])
|
||||||
|
for item in objects:
|
||||||
|
headers, obj = conn.get_object(names[0], item["name"])
|
||||||
|
body[item["name"]] = obj
|
||||||
|
except swiftclient.ClientException as e:
|
||||||
|
raise ex.SwiftClientException(e.message)
|
||||||
|
|
||||||
|
else:
|
||||||
|
container, obj = names
|
||||||
|
try:
|
||||||
|
# First check the size
|
||||||
|
headers = conn.head_object(container, obj)
|
||||||
|
total_KB = int(headers.get('content-length', 0)) / 1024.0
|
||||||
|
if total_KB > CONF.job_binary_max_KB:
|
||||||
|
raise ex.DataTooBigException(round(total_KB, 1),
|
||||||
|
CONF.job_binary_max_KB,
|
||||||
|
"Size of swift object (%sKB) "
|
||||||
|
"is greater than maximum (%sKB)")
|
||||||
|
|
||||||
|
headers, body = conn.get_object(container, obj)
|
||||||
|
except swiftclient.ClientException as e:
|
||||||
|
raise ex.SwiftClientException(e.message)
|
||||||
|
|
||||||
|
return body
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
|
|
||||||
import savanna.exceptions as e
|
import savanna.exceptions as e
|
||||||
import savanna.service.validations.edp.base as b
|
import savanna.service.validations.edp.base as b
|
||||||
|
from savanna.swift import utils as su
|
||||||
|
|
||||||
JOB_BINARY_SCHEMA = {
|
JOB_BINARY_SCHEMA = {
|
||||||
"type": "object",
|
"type": "object",
|
||||||
@ -48,7 +49,7 @@ JOB_BINARY_SCHEMA = {
|
|||||||
def check_job_binary(data, **kwargs):
|
def check_job_binary(data, **kwargs):
|
||||||
job_binary_location_type = data["url"]
|
job_binary_location_type = data["url"]
|
||||||
extra = data.get("extra", {})
|
extra = data.get("extra", {})
|
||||||
if job_binary_location_type.startswith("swift-internal"):
|
if job_binary_location_type.startswith(su.SWIFT_INTERNAL_PREFIX):
|
||||||
if not extra.get("user") or not extra.get("password"):
|
if not extra.get("user") or not extra.get("password"):
|
||||||
raise e.BadJobBinaryException()
|
raise e.BadJobBinaryException()
|
||||||
if job_binary_location_type.startswith("savanna-db"):
|
if job_binary_location_type.startswith("savanna-db"):
|
||||||
|
@ -18,6 +18,7 @@ import logging
|
|||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
from savanna import context
|
from savanna import context
|
||||||
|
from savanna.swift import utils as s
|
||||||
from savanna.utils import xmlutils as x
|
from savanna.utils import xmlutils as x
|
||||||
|
|
||||||
|
|
||||||
@ -27,12 +28,6 @@ HADOOP_SWIFT_AUTH_URL = 'fs.swift.service.savanna.auth.url'
|
|||||||
HADOOP_SWIFT_TENANT = 'fs.swift.service.savanna.tenant'
|
HADOOP_SWIFT_TENANT = 'fs.swift.service.savanna.tenant'
|
||||||
|
|
||||||
|
|
||||||
def _retrieve_auth_url():
|
|
||||||
url = "{0}://{1}:{2}/v2.0/tokens/".format(
|
|
||||||
CONF.os_auth_protocol, CONF.os_auth_host, CONF.os_auth_port)
|
|
||||||
return url
|
|
||||||
|
|
||||||
|
|
||||||
def _retrieve_tenant():
|
def _retrieve_tenant():
|
||||||
try:
|
try:
|
||||||
return context.current().tenant_name
|
return context.current().tenant_name
|
||||||
@ -47,7 +42,7 @@ def get_swift_configs():
|
|||||||
configs = x.load_hadoop_xml_defaults('swift/resources/conf-template.xml')
|
configs = x.load_hadoop_xml_defaults('swift/resources/conf-template.xml')
|
||||||
for conf in configs:
|
for conf in configs:
|
||||||
if conf['name'] == HADOOP_SWIFT_AUTH_URL:
|
if conf['name'] == HADOOP_SWIFT_AUTH_URL:
|
||||||
conf['value'] = _retrieve_auth_url()
|
conf['value'] = s.retrieve_auth_url()
|
||||||
if conf['name'] == HADOOP_SWIFT_TENANT:
|
if conf['name'] == HADOOP_SWIFT_TENANT:
|
||||||
conf['value'] = _retrieve_tenant()
|
conf['value'] = _retrieve_tenant()
|
||||||
|
|
||||||
|
32
savanna/swift/utils.py
Normal file
32
savanna/swift/utils.py
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
# Copyright (c) 2013 Red Hat, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from oslo.config import cfg
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
|
||||||
|
SWIFT_INTERNAL_PREFIX = "swift-internal://"
|
||||||
|
|
||||||
|
#TODO(tmckay): support swift-external in a future version
|
||||||
|
# SWIFT_EXTERNAL_PREFIX = "swift-external://"
|
||||||
|
|
||||||
|
|
||||||
|
def retrieve_auth_url(append_tokens=True):
|
||||||
|
url = "{0}://{1}:{2}/v2.0/{3}".format(
|
||||||
|
CONF.os_auth_protocol,
|
||||||
|
CONF.os_auth_host,
|
||||||
|
CONF.os_auth_port,
|
||||||
|
"tokens/" if append_tokens else "")
|
||||||
|
return url
|
@ -15,6 +15,7 @@
|
|||||||
|
|
||||||
from savanna.service import api
|
from savanna.service import api
|
||||||
from savanna.service.validations.edp import job_binary as b
|
from savanna.service.validations.edp import job_binary as b
|
||||||
|
from savanna.swift import utils as su
|
||||||
from savanna.tests.unit.service.validation import utils as u
|
from savanna.tests.unit.service.validation import utils as u
|
||||||
|
|
||||||
|
|
||||||
@ -40,7 +41,7 @@ class TestJobBinaryValidation(u.ValidationTestCase):
|
|||||||
self._assert_create_object_validation(
|
self._assert_create_object_validation(
|
||||||
data={
|
data={
|
||||||
"name": "j_o_w",
|
"name": "j_o_w",
|
||||||
"url": "swift-internal://o.savanna/k"
|
"url": su.SWIFT_INTERNAL_PREFIX+"o.savanna/k"
|
||||||
},
|
},
|
||||||
bad_req_i=(1, "BAD_JOB_BINARY",
|
bad_req_i=(1, "BAD_JOB_BINARY",
|
||||||
"To work with JobBinary located in internal "
|
"To work with JobBinary located in internal "
|
||||||
|
@ -39,7 +39,7 @@ class SwiftIntegrationTestCase(unittest2.TestCase):
|
|||||||
tenant_name='test_tenant',
|
tenant_name='test_tenant',
|
||||||
token='test_auth_token'))
|
token='test_auth_token'))
|
||||||
|
|
||||||
@mock.patch('savanna.swift.swift_helper._retrieve_auth_url')
|
@mock.patch('savanna.swift.utils.retrieve_auth_url')
|
||||||
def test_get_swift_configs(self, authUrlConfig):
|
def test_get_swift_configs(self, authUrlConfig):
|
||||||
authUrlConfig.return_value = "http://localhost:8080/v2.0/tokens"
|
authUrlConfig.return_value = "http://localhost:8080/v2.0/tokens"
|
||||||
|
|
||||||
|
@ -19,6 +19,7 @@ import jsonschema
|
|||||||
import six
|
import six
|
||||||
|
|
||||||
from savanna.openstack.common import uuidutils
|
from savanna.openstack.common import uuidutils
|
||||||
|
from savanna.swift import utils as su
|
||||||
|
|
||||||
|
|
||||||
@jsonschema.FormatChecker.cls_checks('valid_name')
|
@jsonschema.FormatChecker.cls_checks('valid_name')
|
||||||
@ -33,8 +34,8 @@ def validate_name_format(entry):
|
|||||||
def validate_job_location_format(entry):
|
def validate_job_location_format(entry):
|
||||||
if entry.startswith('savanna-db://'):
|
if entry.startswith('savanna-db://'):
|
||||||
return uuidutils.is_uuid_like(entry[len("savanna-db://"):])
|
return uuidutils.is_uuid_like(entry[len("savanna-db://"):])
|
||||||
if (entry.startswith('swift-internal://') or
|
if entry.startswith(su.SWIFT_INTERNAL_PREFIX):
|
||||||
entry.startswith('swift-external://')):
|
#TODO(tmckay):allow su.SWIFT_EXTERNAL_PREFIX in a future version
|
||||||
#TODO(nprivalova):add hostname validation
|
#TODO(nprivalova):add hostname validation
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
Loading…
Reference in New Issue
Block a user