Decouple StorletData from StorletRequest/Response
Each StorletRequest or StorletResponse can contain multiple StorletData when using multi input/output feature. Change-Id: I755583116d55588083ec9703de28344f3e20f7ee
This commit is contained in:
parent
f1ee4f16a3
commit
639fee6cd2
|
@ -154,40 +154,39 @@ class StorletData(object):
|
|||
return self.data_fd is not None
|
||||
|
||||
|
||||
class StorletRequest(StorletData):
|
||||
class StorletRequest(object):
|
||||
"""
|
||||
The StorletRequest class represents a request to be processed by
|
||||
the storlet.
|
||||
"""
|
||||
|
||||
required_options = []
|
||||
|
||||
def __init__(self, storlet_id, params, user_metadata,
|
||||
data_iter=None, data_fd=None, options=None,
|
||||
timeout=10, cancel=None):
|
||||
def __init__(self, storlet_id, params, data, options=None,
|
||||
extra_data_list=None):
|
||||
"""
|
||||
:param storlet_id: storlet id
|
||||
:param params: parameters for storlet execution
|
||||
:param user_metadata: user metadata related to the data to be processed
|
||||
:param data_iter: iterator to read data to be processed
|
||||
:param data_fd: File descriptor to read data to be processed
|
||||
:param data: StorletData instance
|
||||
:param options: options specific to StorletRequest types
|
||||
:param timeout: Timeout to be set for data reading
|
||||
:param cancel: cancel operation to be executed when timeout happens
|
||||
:param extra_data_list: List of StorletData instances
|
||||
:raises ValueError: when some of the required options are missing
|
||||
"""
|
||||
super(StorletRequest, self).__init__(
|
||||
user_metadata, data_iter, data_fd, timeout, cancel)
|
||||
self.storlet_id = storlet_id
|
||||
self.params = copy.deepcopy(params)
|
||||
if options is None:
|
||||
self.options = {}
|
||||
else:
|
||||
self.options = options
|
||||
self.data = data
|
||||
self.options = options or {}
|
||||
self.extra_data_list = extra_data_list or []
|
||||
|
||||
for opt in self.required_options:
|
||||
if options.get(opt) is None:
|
||||
raise ValueError('Required option %s is missing' % opt)
|
||||
|
||||
|
||||
class StorletResponse(StorletData):
|
||||
def __init__(self, user_metadata, data_iter=None, data_fd=None,
|
||||
timeout=10, cancel=None):
|
||||
super(StorletResponse, self).__init__(
|
||||
user_metadata, data_iter, data_fd, timeout, cancel)
|
||||
class StorletResponse(object):
|
||||
"""
|
||||
The StorletResponse class represents a response from the storlet
|
||||
"""
|
||||
|
||||
def __init__(self, data):
|
||||
self.data = data
|
||||
|
|
|
@ -39,5 +39,5 @@ class StorletGatewayBase(object, metaclass=abc.ABCMeta):
|
|||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def invocation_flow(self, sreq, extra_sources=None):
|
||||
def invocation_flow(self, sreq):
|
||||
pass
|
||||
|
|
|
@ -24,33 +24,22 @@ from storlets.gateway.gateways.container.runtime import RunTimePaths, \
|
|||
|
||||
|
||||
class ContainerStorletRequest(StorletRequest):
|
||||
"""
|
||||
The ContainerStorletRequest class represents a request to be processed by
|
||||
the storlet the request is derived from the Swift request and
|
||||
essentially consists of:
|
||||
1. A data stream to be processed
|
||||
2. Metadata identifying the stream
|
||||
"""
|
||||
|
||||
# TODO(takashi): Some of following parameters should be defined common
|
||||
# parameters for StorletRequest
|
||||
required_options = ['storlet_main', 'storlet_language', 'file_manager']
|
||||
|
||||
def __init__(self, storlet_id, params, user_metadata, data_iter=None,
|
||||
data_fd=None, options=None):
|
||||
def __init__(self, storlet_id, params, data, options=None,
|
||||
extra_data_list=None):
|
||||
"""
|
||||
:param storlet_id: storlet id
|
||||
:param params: execution parameters
|
||||
:param user_metadata: user metadata
|
||||
:param data_iter: an iterator to read data
|
||||
:param data_fd: a file descriptor to read data
|
||||
:param options: a dictionaly which stores gateway specific options.
|
||||
:param data: StorletData instance
|
||||
:param options: options specific to ContainerStorletRequest
|
||||
:param extra_data_list: List of StorletData instances
|
||||
:raises ValueError: when some of the required options (storlet_main
|
||||
and file_manager) are missing
|
||||
"""
|
||||
super(ContainerStorletRequest, self).__init__(
|
||||
storlet_id, params, user_metadata, data_iter, data_fd,
|
||||
options=options)
|
||||
storlet_id, params, data, options, extra_data_list)
|
||||
|
||||
self.generate_log = self.options.get('generate_log', False)
|
||||
|
||||
|
@ -181,14 +170,11 @@ class StorletGatewayContainer(StorletGatewayBase):
|
|||
raise ValueError('Mandatory parameter is missing'
|
||||
': {0}'.format(md))
|
||||
|
||||
def invocation_flow(self, sreq, extra_sources=None):
|
||||
def invocation_flow(self, sreq):
|
||||
"""
|
||||
Invoke the backend protocol with gateway
|
||||
|
||||
:param sreq: StorletRequest instance
|
||||
:param extra_sources (WIP): A list of StorletRequest instance to gather
|
||||
as extra resoureces to feed to storlet
|
||||
container as data source
|
||||
:return: StorletResponse instance
|
||||
"""
|
||||
run_time_sbox = self.sandbox(self.scope, self.conf, self.logger)
|
||||
|
@ -204,8 +190,7 @@ class StorletGatewayContainer(StorletGatewayBase):
|
|||
storlet_pipe_path,
|
||||
slog_path,
|
||||
self.storlet_timeout,
|
||||
self.logger,
|
||||
extra_sources=extra_sources)
|
||||
self.logger)
|
||||
|
||||
sresp = sprotocol.communicate()
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@ from storlets.sbus import file_description as sbus_fd
|
|||
from storlets.gateway.common.exceptions import StorletRuntimeException, \
|
||||
StorletTimeout
|
||||
from storlets.gateway.common.logger import StorletLogger
|
||||
from storlets.gateway.common.stob import StorletResponse
|
||||
from storlets.gateway.common.stob import StorletData, StorletResponse
|
||||
|
||||
MAX_METADATA_SIZE = 4096
|
||||
|
||||
|
@ -435,12 +435,9 @@ class StorletInvocationProtocol(object):
|
|||
:param storlet_logger_path: path string to log file
|
||||
:param timeout: integer of timeout for waiting the resp from container
|
||||
:param logger: logger instance
|
||||
:param extra_sources (WIP): a list of StorletRequest instances
|
||||
which keep data_iter for adding extra source
|
||||
as data stream
|
||||
"""
|
||||
def __init__(self, srequest, storlet_pipe_path, storlet_logger_path,
|
||||
timeout, logger, extra_sources=None):
|
||||
timeout, logger):
|
||||
self.srequest = srequest
|
||||
self.storlet_pipe_path = storlet_pipe_path
|
||||
self.storlet_logger = StorletLogger(storlet_logger_path)
|
||||
|
@ -457,8 +454,7 @@ class StorletInvocationProtocol(object):
|
|||
self._input_data_write_fd = None
|
||||
|
||||
self.extra_data_sources = []
|
||||
extra_sources = extra_sources or []
|
||||
for source in extra_sources:
|
||||
for source in self.srequest.extra_data_list:
|
||||
if source.has_fd:
|
||||
# TODO(kota_): it may be data_fd in the future.
|
||||
raise Exception(
|
||||
|
@ -473,8 +469,8 @@ class StorletInvocationProtocol(object):
|
|||
"""
|
||||
File descriptor to read the input body content
|
||||
"""
|
||||
if self.srequest.has_fd:
|
||||
return self.srequest.data_fd
|
||||
if self.srequest.data.has_fd:
|
||||
return self.srequest.data.data_fd
|
||||
else:
|
||||
return self._input_data_read_fd
|
||||
|
||||
|
@ -489,16 +485,21 @@ class StorletInvocationProtocol(object):
|
|||
{'start': str(self.srequest.start),
|
||||
'end': str(self.srequest.end)})
|
||||
|
||||
fds = [SBusFileDescriptor(sbus_fd.SBUS_FD_INPUT_OBJECT,
|
||||
self.input_data_read_fd,
|
||||
storage_metadata=self.srequest.user_metadata,
|
||||
storlets_metadata=storlets_metadata),
|
||||
SBusFileDescriptor(sbus_fd.SBUS_FD_OUTPUT_OBJECT,
|
||||
self.data_write_fd),
|
||||
SBusFileDescriptor(sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
|
||||
self.metadata_write_fd),
|
||||
SBusFileDescriptor(sbus_fd.SBUS_FD_LOGGER,
|
||||
self.storlet_logger.getfd())]
|
||||
fds = [
|
||||
SBusFileDescriptor(
|
||||
sbus_fd.SBUS_FD_INPUT_OBJECT,
|
||||
self.input_data_read_fd,
|
||||
storage_metadata=self.srequest.data.user_metadata,
|
||||
storlets_metadata=storlets_metadata),
|
||||
SBusFileDescriptor(
|
||||
sbus_fd.SBUS_FD_OUTPUT_OBJECT,
|
||||
self.data_write_fd),
|
||||
SBusFileDescriptor(
|
||||
sbus_fd.SBUS_FD_OUTPUT_OBJECT_METADATA,
|
||||
self.metadata_write_fd),
|
||||
SBusFileDescriptor(
|
||||
sbus_fd.SBUS_FD_LOGGER,
|
||||
self.storlet_logger.getfd())]
|
||||
|
||||
for source in self.extra_data_sources:
|
||||
fd = SBusFileDescriptor(
|
||||
|
@ -527,7 +528,7 @@ class StorletInvocationProtocol(object):
|
|||
"""
|
||||
Create all pipse used for Storlet execution
|
||||
"""
|
||||
if not self.srequest.has_fd:
|
||||
if not self.srequest.data.has_fd:
|
||||
self._input_data_read_fd, self._input_data_write_fd = os.pipe()
|
||||
self.data_read_fd, self.data_write_fd = os.pipe()
|
||||
self.metadata_read_fd, self.metadata_write_fd = os.pipe()
|
||||
|
@ -557,7 +558,7 @@ class StorletInvocationProtocol(object):
|
|||
Close all of the container side descriptors
|
||||
"""
|
||||
fds = [self.data_write_fd, self.metadata_write_fd]
|
||||
if not self.srequest.has_fd:
|
||||
if not self.srequest.data.has_fd:
|
||||
fds.append(self.input_data_read_fd)
|
||||
fds.extend([source['read_fd'] for source in self.extra_data_sources])
|
||||
for fd in fds:
|
||||
|
@ -682,7 +683,7 @@ class StorletInvocationProtocol(object):
|
|||
try:
|
||||
self._invoke()
|
||||
|
||||
if not self.srequest.has_fd:
|
||||
if not self.srequest.data.has_fd:
|
||||
self._wait_for_write_with_timeout(self._input_data_write_fd)
|
||||
|
||||
# We do the writing in a different thread.
|
||||
|
@ -695,7 +696,7 @@ class StorletInvocationProtocol(object):
|
|||
# of the Storlet writer.
|
||||
eventlet.spawn_n(self._write_input_data,
|
||||
self._input_data_write_fd,
|
||||
self.srequest.data_iter)
|
||||
self.srequest.data.data_iter)
|
||||
|
||||
for source in self.extra_data_sources:
|
||||
# NOTE(kota_): not sure right now if using eventlet.spawn_n is
|
||||
|
@ -709,11 +710,12 @@ class StorletInvocationProtocol(object):
|
|||
out_md = self._read_metadata()
|
||||
self._wait_for_read_with_timeout(self.data_read_fd)
|
||||
|
||||
return StorletResponse(out_md, data_fd=self.data_read_fd,
|
||||
cancel=self._cancel)
|
||||
data = StorletData(out_md, data_fd=self.data_read_fd,
|
||||
cancel=self._cancel)
|
||||
return StorletResponse(data)
|
||||
except Exception:
|
||||
self._close_local_side_descriptors()
|
||||
if not self.srequest.has_fd:
|
||||
if not self.srequest.data.has_fd:
|
||||
self._close_input_data_descriptors()
|
||||
raise
|
||||
|
||||
|
|
|
@ -14,7 +14,8 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from storlets.gateway.common.stob import StorletRequest, StorletResponse
|
||||
from storlets.gateway.common.stob import StorletRequest, \
|
||||
StorletResponse, StorletData
|
||||
from storlets.gateway.gateways.base import StorletGatewayBase
|
||||
|
||||
|
||||
|
@ -36,10 +37,10 @@ class StubStorletGateway(StorletGatewayBase):
|
|||
def validate_dependency_registration(cls, params, obj):
|
||||
pass
|
||||
|
||||
def indentity_invocation(self, user_metadata, data_iter):
|
||||
def indentity_invocation(self, sreq):
|
||||
self.logger.debug("Identity invocation is called")
|
||||
return StorletResponse(user_metadata, data_iter)
|
||||
data = StorletData(sreq.data.user_metadata, sreq.data.data_iter)
|
||||
return StorletResponse(data)
|
||||
|
||||
def invocation_flow(self, sreq, extra_resources=None):
|
||||
return self.indentity_invocation(sreq.user_metadata,
|
||||
sreq.data_iter)
|
||||
return self.indentity_invocation(sreq)
|
||||
|
|
|
@ -19,9 +19,10 @@ from swift.common.swob import HTTPBadRequest, Response, Range, \
|
|||
from swift.common.utils import config_true_value
|
||||
from urllib.parse import unquote
|
||||
|
||||
from storlets.gateway.common.exceptions import FileManagementError
|
||||
from storlets.gateway.common.exceptions import FileManagementError, \
|
||||
StorletRuntimeException
|
||||
from storlets.gateway.common.file_manager import FileManager
|
||||
from storlets.gateway.common.exceptions import StorletRuntimeException
|
||||
from storlets.gateway.common.stob import StorletData
|
||||
|
||||
|
||||
class NotStorletRequest(Exception):
|
||||
|
@ -440,8 +441,10 @@ class StorletBaseHandler(object):
|
|||
resp.headers['Content-Range']
|
||||
new_headers.pop('Content-Range')
|
||||
|
||||
self._set_metadata_in_headers(new_headers, sresp.user_metadata)
|
||||
response = Response(headers=new_headers, app_iter=sresp.data_iter,
|
||||
self._set_metadata_in_headers(new_headers,
|
||||
sresp.data.user_metadata)
|
||||
response = Response(headers=new_headers,
|
||||
app_iter=sresp.data.data_iter,
|
||||
request=self.request)
|
||||
except StorletRuntimeException:
|
||||
response = HTTPServiceUnavailable()
|
||||
|
@ -505,10 +508,8 @@ class StorletBaseHandler(object):
|
|||
options = self._get_storlet_invocation_options(req)
|
||||
|
||||
if hasattr(sbody_iter, '_fp'):
|
||||
sreq = self.sreq_class(storlet_id, req.params, user_metadata,
|
||||
data_fd=sbody_iter._fp.fileno(),
|
||||
options=options)
|
||||
data = StorletData(user_metadata, data_fd=sbody_iter._fp.fileno())
|
||||
else:
|
||||
sreq = self.sreq_class(storlet_id, req.params, user_metadata,
|
||||
data_iter=sbody_iter, options=options)
|
||||
return sreq
|
||||
data = StorletData(user_metadata, data_iter=sbody_iter)
|
||||
|
||||
return self.sreq_class(storlet_id, req.params, data, options=options)
|
||||
|
|
|
@ -26,6 +26,7 @@ from swift.common.wsgi import make_subrequest
|
|||
from swift.proxy.controllers.base import get_account_info
|
||||
from urllib.parse import quote
|
||||
|
||||
from storlets.gateway.common.stob import StorletData
|
||||
from storlets.swift_middleware.handlers.base import StorletBaseHandler, \
|
||||
NotStorletRequest, NotStorletExecution
|
||||
|
||||
|
@ -279,7 +280,9 @@ class StorletProxyHandler(StorletBaseHandler):
|
|||
def _call_gateway(self, resp):
|
||||
sreq = self._build_storlet_request(self.request, resp.headers,
|
||||
resp.app_iter)
|
||||
return self.gateway.invocation_flow(sreq, self.extra_sources)
|
||||
if self.extra_sources:
|
||||
sreq.extra_data_list = self.extra_sources
|
||||
return self.gateway.invocation_flow(sreq)
|
||||
|
||||
def augment_storlet_request(self, params):
|
||||
"""
|
||||
|
@ -314,8 +317,8 @@ class StorletProxyHandler(StorletBaseHandler):
|
|||
# expicially, in parallel with primary GET
|
||||
|
||||
self.extra_sources.append(
|
||||
self._build_storlet_request(
|
||||
self.request, sub_resp.headers,
|
||||
StorletData(
|
||||
self._get_user_metadata(sub_resp.headers),
|
||||
sub_resp.app_iter))
|
||||
except ValueError:
|
||||
raise HTTPBadRequest(
|
||||
|
@ -468,10 +471,12 @@ class StorletProxyHandler(StorletBaseHandler):
|
|||
sreq = self._build_storlet_request(self.request, src_resp.headers,
|
||||
src_resp.app_iter)
|
||||
self.gather_extra_sources()
|
||||
sresp = self.gateway.invocation_flow(sreq, self.extra_sources)
|
||||
data_iter = sresp.data_iter
|
||||
if self.extra_sources:
|
||||
sreq.extra_data_list = self.extra_sources
|
||||
sresp = self.gateway.invocation_flow(sreq)
|
||||
data_iter = sresp.data.data_iter
|
||||
self._set_metadata_in_headers(self.request.headers,
|
||||
sresp.user_metadata)
|
||||
sresp.data.user_metadata)
|
||||
else:
|
||||
data_iter = src_resp.app_iter
|
||||
|
||||
|
@ -510,8 +515,8 @@ class StorletProxyHandler(StorletBaseHandler):
|
|||
|
||||
sresp = self.gateway.invocation_flow(sreq)
|
||||
self._set_metadata_in_headers(self.request.headers,
|
||||
sresp.user_metadata)
|
||||
return self.handle_put_copy_response(sresp.data_iter)
|
||||
sresp.data.user_metadata)
|
||||
return self.handle_put_copy_response(sresp.data.data_iter)
|
||||
|
||||
@public
|
||||
def COPY(self):
|
||||
|
|
|
@ -21,10 +21,11 @@ from tempfile import mkdtemp
|
|||
import unittest
|
||||
from unittest import mock
|
||||
|
||||
from swift.common.swob import Request, Response
|
||||
from swift.common.swob import Response
|
||||
from swift.common.utils import FileLikeIter
|
||||
|
||||
from storlets.sbus.client import SBusResponse
|
||||
from storlets.gateway.common.stob import StorletData
|
||||
from storlets.gateway.gateways.container.gateway import ContainerStorletRequest
|
||||
from tests.unit import FakeLogger
|
||||
from tests.unit.gateway.gateways import FakeFileManager
|
||||
|
@ -37,15 +38,16 @@ class TestContainerStorletRequest(unittest.TestCase):
|
|||
storlet_id = 'Storlet-1.0.jar'
|
||||
params = {'Param1': 'Value1', 'Param2': 'Value2'}
|
||||
metadata = {'MetaKey1': 'MetaValue1', 'MetaKey2': 'MetaValue2'}
|
||||
data = StorletData(metadata, iter(StringIO()))
|
||||
|
||||
# with dependencies
|
||||
options = {'storlet_main': 'org.openstack.storlet.Storlet',
|
||||
'storlet_dependency': 'dep1,dep2',
|
||||
'storlet_language': 'java',
|
||||
'file_manager': FakeFileManager('storlet', 'dep')}
|
||||
dsreq = ContainerStorletRequest(storlet_id, params, metadata,
|
||||
iter(StringIO()), options=options)
|
||||
self.assertEqual(metadata, dsreq.user_metadata)
|
||||
dsreq = ContainerStorletRequest(storlet_id, params, data,
|
||||
options=options)
|
||||
self.assertEqual(metadata, dsreq.data.user_metadata)
|
||||
self.assertEqual(params, dsreq.params)
|
||||
self.assertEqual('Storlet-1.0.jar', dsreq.storlet_id)
|
||||
self.assertEqual('org.openstack.storlet.Storlet', dsreq.storlet_main)
|
||||
|
@ -57,9 +59,9 @@ class TestContainerStorletRequest(unittest.TestCase):
|
|||
options = {'storlet_main': 'org.openstack.storlet.Storlet',
|
||||
'storlet_language': 'java',
|
||||
'file_manager': FakeFileManager('storlet', 'dep')}
|
||||
dsreq = ContainerStorletRequest(storlet_id, params, metadata,
|
||||
iter(StringIO()), options=options)
|
||||
self.assertEqual(metadata, dsreq.user_metadata)
|
||||
dsreq = ContainerStorletRequest(storlet_id, params, data,
|
||||
options=options)
|
||||
self.assertEqual(metadata, dsreq.data.user_metadata)
|
||||
self.assertEqual(params, dsreq.params)
|
||||
self.assertEqual('Storlet-1.0.jar', dsreq.storlet_id)
|
||||
self.assertEqual('org.openstack.storlet.Storlet', dsreq.storlet_main)
|
||||
|
@ -71,35 +73,33 @@ class TestContainerStorletRequest(unittest.TestCase):
|
|||
options = {'storlet_main': 'org.openstack.storlet.Storlet',
|
||||
'file_manager': FakeFileManager('storlet', 'dep')}
|
||||
with self.assertRaises(ValueError):
|
||||
ContainerStorletRequest(storlet_id, params, metadata,
|
||||
iter(StringIO()), options=options)
|
||||
ContainerStorletRequest(storlet_id, params, data, options=options)
|
||||
|
||||
# storlet_main is not given
|
||||
options = {'storlet_language': 'java',
|
||||
'file_manager': FakeFileManager('storlet', 'dep')}
|
||||
with self.assertRaises(ValueError):
|
||||
ContainerStorletRequest(storlet_id, params, metadata,
|
||||
iter(StringIO()), options=options)
|
||||
ContainerStorletRequest(storlet_id, params, data, options=options)
|
||||
|
||||
# file_manager is not given
|
||||
options = {'storlet_main': 'org.openstack.storlet.Storlet',
|
||||
'storlet_language': 'java'}
|
||||
with self.assertRaises(ValueError):
|
||||
ContainerStorletRequest(storlet_id, params, metadata,
|
||||
iter(StringIO()), options=options)
|
||||
ContainerStorletRequest(storlet_id, params, data, options=options)
|
||||
|
||||
# Python
|
||||
storlet_id = 'storlet.py'
|
||||
params = {'Param1': 'Value1', 'Param2': 'Value2'}
|
||||
metadata = {'MetaKey1': 'MetaValue1', 'MetaKey2': 'MetaValue2'}
|
||||
data = StorletData(metadata, iter(StringIO()))
|
||||
|
||||
# without language version
|
||||
options = {'storlet_main': 'storlet.Storlet',
|
||||
'storlet_language': 'python',
|
||||
'file_manager': FakeFileManager('storlet', 'dep')}
|
||||
dsreq = ContainerStorletRequest(storlet_id, params, metadata,
|
||||
iter(StringIO()), options=options)
|
||||
self.assertEqual(metadata, dsreq.user_metadata)
|
||||
dsreq = ContainerStorletRequest(storlet_id, params, data,
|
||||
options=options)
|
||||
self.assertEqual(metadata, dsreq.data.user_metadata)
|
||||
self.assertEqual(params, dsreq.params)
|
||||
self.assertEqual('storlet.py', dsreq.storlet_id)
|
||||
self.assertEqual('storlet.Storlet', dsreq.storlet_main)
|
||||
|
@ -112,9 +112,9 @@ class TestContainerStorletRequest(unittest.TestCase):
|
|||
'storlet_language': 'python',
|
||||
'storlet_language_version': '3.6',
|
||||
'file_manager': FakeFileManager('storlet', 'dep')}
|
||||
dsreq = ContainerStorletRequest(storlet_id, params, metadata,
|
||||
iter(StringIO()), options=options)
|
||||
self.assertEqual(metadata, dsreq.user_metadata)
|
||||
dsreq = ContainerStorletRequest(storlet_id, params, data,
|
||||
options=options)
|
||||
self.assertEqual(metadata, dsreq.data.user_metadata)
|
||||
self.assertEqual(params, dsreq.params)
|
||||
self.assertEqual('storlet.py', dsreq.storlet_id)
|
||||
self.assertEqual('storlet.Storlet', dsreq.storlet_main)
|
||||
|
@ -126,14 +126,16 @@ class TestContainerStorletRequest(unittest.TestCase):
|
|||
storlet_id = 'Storlet-1.0.jar'
|
||||
params = {}
|
||||
metadata = {}
|
||||
data = StorletData(metadata, None, 0)
|
||||
|
||||
options = {'storlet_main': 'org.openstack.storlet.Storlet',
|
||||
'storlet_dependency': 'dep1,dep2',
|
||||
'storlet_language': 'java',
|
||||
'file_manager': FakeFileManager('storlet', 'dep'),
|
||||
'range_start': 1,
|
||||
'range_end': 6}
|
||||
dsreq = ContainerStorletRequest(storlet_id, params, metadata,
|
||||
None, 0, options=options)
|
||||
dsreq = ContainerStorletRequest(storlet_id, params, data,
|
||||
options=options)
|
||||
|
||||
self.assertEqual('Storlet-1.0.jar', dsreq.storlet_id)
|
||||
self.assertEqual('org.openstack.storlet.Storlet', dsreq.storlet_main)
|
||||
|
@ -149,8 +151,8 @@ class TestContainerStorletRequest(unittest.TestCase):
|
|||
'file_manager': FakeFileManager('storlet', 'dep'),
|
||||
'range_start': 0,
|
||||
'range_end': 0}
|
||||
dsreq = ContainerStorletRequest(storlet_id, params, metadata,
|
||||
None, 0, options=options)
|
||||
dsreq = ContainerStorletRequest(storlet_id, params, data,
|
||||
options=options)
|
||||
|
||||
self.assertEqual('Storlet-1.0.jar', dsreq.storlet_id)
|
||||
self.assertEqual('org.openstack.storlet.Storlet', dsreq.storlet_main)
|
||||
|
@ -168,8 +170,9 @@ class TestContainerStorletRequest(unittest.TestCase):
|
|||
'storlet_dependency': 'dep1,dep2',
|
||||
'storlet_language': 'java',
|
||||
'file_manager': FakeFileManager('storlet', 'dep')}
|
||||
dsreq = ContainerStorletRequest(storlet_id, params, metadata,
|
||||
None, 0, options=options)
|
||||
data = StorletData(metadata, None, 0)
|
||||
dsreq = ContainerStorletRequest(storlet_id, params, data,
|
||||
options=options)
|
||||
self.assertFalse(dsreq.has_range)
|
||||
|
||||
options = {'storlet_main': 'org.openstack.storlet.Storlet',
|
||||
|
@ -178,8 +181,8 @@ class TestContainerStorletRequest(unittest.TestCase):
|
|||
'file_manager': FakeFileManager('storlet', 'dep'),
|
||||
'range_start': 1,
|
||||
'range_end': 6}
|
||||
dsreq = ContainerStorletRequest(storlet_id, params, metadata,
|
||||
None, 0, options=options)
|
||||
dsreq = ContainerStorletRequest(storlet_id, params, data,
|
||||
options=options)
|
||||
self.assertTrue(dsreq.has_range)
|
||||
|
||||
options = {'storlet_main': 'org.openstack.storlet.Storlet',
|
||||
|
@ -188,8 +191,8 @@ class TestContainerStorletRequest(unittest.TestCase):
|
|||
'file_manager': FakeFileManager('storlet', 'dep'),
|
||||
'range_start': 0,
|
||||
'range_end': 6}
|
||||
dsreq = ContainerStorletRequest(storlet_id, params, metadata,
|
||||
None, 0, options=options)
|
||||
dsreq = ContainerStorletRequest(storlet_id, params, data,
|
||||
options=options)
|
||||
self.assertTrue(dsreq.has_range)
|
||||
|
||||
|
||||
|
@ -430,11 +433,14 @@ class ContainerGatewayTestMixin(object):
|
|||
'storlet_language': 'java',
|
||||
'file_manager': FakeFileManager('storlet', 'dep')}
|
||||
|
||||
data = StorletData(
|
||||
user_metadata={},
|
||||
data_iter=iter('body'))
|
||||
st_req = ContainerStorletRequest(
|
||||
storlet_id=self.sobj,
|
||||
params={},
|
||||
user_metadata={},
|
||||
data_iter=iter('body'), options=options)
|
||||
data=data,
|
||||
options=options)
|
||||
|
||||
# TODO(kota_): need more efficient way for emuration of return value
|
||||
# from SDaemon
|
||||
|
@ -487,9 +493,9 @@ class ContainerGatewayTestMixin(object):
|
|||
client.start_daemon.return_value = SBusResponse(True, 'OK')
|
||||
client.execute.return_value = SBusResponse(True, 'OK', 'someid')
|
||||
|
||||
sresp = self.gateway.invocation_flow(st_req, extra_sources)
|
||||
sresp = self.gateway.invocation_flow(st_req)
|
||||
eventlet.sleep(0.1)
|
||||
file_like = FileLikeIter(sresp.data_iter)
|
||||
file_like = FileLikeIter(sresp.data.data_iter)
|
||||
self.assertEqual(b'something', file_like.read())
|
||||
|
||||
# I hate the decorator to return an instance but to track current
|
||||
|
@ -503,11 +509,12 @@ class ContainerGatewayTestMixin(object):
|
|||
return BytesIO(b'mock'), None
|
||||
|
||||
st_req.file_manager = MockFileManager()
|
||||
st_req.extra_data_list = extra_sources
|
||||
|
||||
test_invocation_flow()
|
||||
|
||||
# ensure st_req.app_iter is drawn
|
||||
self.assertRaises(StopIteration, next, st_req.data_iter)
|
||||
self.assertRaises(StopIteration, next, st_req.data.data_iter)
|
||||
expected_mock_writer_calls = len(extra_sources) + 1
|
||||
self.assertEqual(expected_mock_writer_calls,
|
||||
len(called_fd_and_bodies))
|
||||
|
@ -518,37 +525,20 @@ class ContainerGatewayTestMixin(object):
|
|||
self._test_invocation_flow()
|
||||
|
||||
def test_invocation_flow_with_extra_sources(self):
|
||||
options = {'generate_log': False,
|
||||
'scope': 'AUTH_account',
|
||||
'storlet_main': 'org.openstack.storlet.Storlet',
|
||||
'storlet_dependency': 'dep1,dep2',
|
||||
'storlet_language': 'java',
|
||||
'file_manager': FakeFileManager('storlet', 'dep')}
|
||||
|
||||
data_sources = []
|
||||
|
||||
def generate_extra_st_request():
|
||||
# This works similarly with build_storlet_request
|
||||
# TODO(kota_): think of more generarl way w/o
|
||||
# build_storlet_request
|
||||
sw_req = Request.blank(
|
||||
self.req_path, environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'X-Run-Storlet': self.sobj})
|
||||
|
||||
def generate_extra_data():
|
||||
sw_resp = Response(
|
||||
app_iter=iter(['This is a response body']), status=200)
|
||||
|
||||
st_req = ContainerStorletRequest(
|
||||
storlet_id=sw_req.headers['X-Run-Storlet'],
|
||||
params=sw_req.params,
|
||||
data = StorletData(
|
||||
user_metadata={},
|
||||
data_iter=sw_resp.app_iter, options=options)
|
||||
data_iter=sw_resp.app_iter)
|
||||
data_sources.append(sw_resp.app_iter)
|
||||
return st_req
|
||||
return data
|
||||
|
||||
extra_request = generate_extra_st_request()
|
||||
mock_calls = self._test_invocation_flow(
|
||||
extra_sources=[extra_request])
|
||||
extra_data = generate_extra_data()
|
||||
mock_calls = self._test_invocation_flow(extra_sources=[extra_data])
|
||||
self.assertEqual('This is a response body', mock_calls[1][1])
|
||||
|
||||
# run all existing eventlet threads
|
||||
|
|
|
@ -26,6 +26,7 @@ from storlets.sbus.client import SBusResponse
|
|||
from storlets.sbus.client.exceptions import SBusClientIOError
|
||||
from storlets.gateway.common.exceptions import StorletRuntimeException, \
|
||||
StorletTimeout
|
||||
from storlets.gateway.common.stob import StorletData
|
||||
from storlets.gateway.gateways.container.gateway import ContainerStorletRequest
|
||||
from storlets.gateway.gateways.container.runtime import \
|
||||
RunTimePaths, StorletInvocationProtocol
|
||||
|
@ -249,8 +250,9 @@ class TestStorletInvocationProtocol(unittest.TestCase):
|
|||
'storlet_dependency': 'dep1,dep2',
|
||||
'storlet_language': 'java',
|
||||
'file_manager': FakeFileManager('storlet', 'dep')}
|
||||
data = StorletData({}, iter(StringIO()))
|
||||
storlet_request = ContainerStorletRequest(
|
||||
self.storlet_id, {}, {}, iter(StringIO()), options=self.options)
|
||||
self.storlet_id, {}, data, options=self.options)
|
||||
self.protocol = StorletInvocationProtocol(
|
||||
storlet_request, self.pipe_path, self.log_file, 1, self.logger)
|
||||
|
||||
|
@ -324,28 +326,37 @@ class TestStorletInvocationProtocol(unittest.TestCase):
|
|||
|
||||
def test_invocation_protocol_remote_fds(self):
|
||||
# In default, we have 4 fds in remote_fds
|
||||
data = StorletData({}, iter(StringIO()))
|
||||
storlet_request = ContainerStorletRequest(
|
||||
self.storlet_id, {}, {}, iter(StringIO()), options=self.options)
|
||||
self.storlet_id, {}, data, options=self.options)
|
||||
protocol = StorletInvocationProtocol(
|
||||
storlet_request, self.pipe_path, self.log_file, 1, self.logger)
|
||||
with protocol.storlet_logger.activate():
|
||||
self.assertEqual(4, len(protocol.remote_fds))
|
||||
|
||||
# extra_resources expands the remote_fds
|
||||
data = StorletData({}, iter(StringIO()))
|
||||
extra_data_list = [StorletData({}, iter(StringIO()))]
|
||||
storlet_request = ContainerStorletRequest(
|
||||
self.storlet_id, {}, {}, iter(StringIO()), options=self.options)
|
||||
self.storlet_id, {}, data, options=self.options,
|
||||
extra_data_list=extra_data_list)
|
||||
protocol = StorletInvocationProtocol(
|
||||
storlet_request, self.pipe_path, self.log_file, 1, self.logger,
|
||||
extra_sources=[storlet_request])
|
||||
storlet_request, self.pipe_path, self.log_file, 1, self.logger)
|
||||
with protocol.storlet_logger.activate():
|
||||
self.assertEqual(5, len(protocol.remote_fds))
|
||||
|
||||
# 2 more extra_resources expands the remote_fds
|
||||
data = StorletData({}, iter(StringIO()))
|
||||
extra_data_list = [
|
||||
StorletData({}, iter(StringIO())),
|
||||
StorletData({}, iter(StringIO())),
|
||||
StorletData({}, iter(StringIO()))
|
||||
]
|
||||
storlet_request = ContainerStorletRequest(
|
||||
self.storlet_id, {}, {}, iter(StringIO()), options=self.options)
|
||||
self.storlet_id, {}, data, options=self.options,
|
||||
extra_data_list=extra_data_list)
|
||||
protocol = StorletInvocationProtocol(
|
||||
storlet_request, self.pipe_path, self.log_file, 1, self.logger,
|
||||
extra_sources=[storlet_request] * 3)
|
||||
storlet_request, self.pipe_path, self.log_file, 1, self.logger)
|
||||
with protocol.storlet_logger.activate():
|
||||
self.assertEqual(7, len(protocol.remote_fds))
|
||||
|
||||
|
@ -401,8 +412,9 @@ class TestStorletInvocationProtocolPython(TestStorletInvocationProtocol):
|
|||
'storlet_language': 'python',
|
||||
'language_version': '3.6',
|
||||
'file_manager': FakeFileManager('storlet', 'dep')}
|
||||
data = StorletData({}, iter(StringIO()))
|
||||
storlet_request = ContainerStorletRequest(
|
||||
self.storlet_id, {}, {}, iter(StringIO()), options=self.options)
|
||||
self.storlet_id, {}, data, options=self.options)
|
||||
self.protocol = StorletInvocationProtocol(
|
||||
storlet_request, self.pipe_path, self.log_file, 1, self.logger)
|
||||
|
||||
|
|
Loading…
Reference in New Issue