added support for running storlets on byte-range requests (in the proxy node)

added a storlet that streams out every other input byte
added a test that checks the byte-range functionality
This commit is contained in:
root
2015-07-08 13:36:20 +03:00
parent ac5e30a124
commit 9a50d94e0a
6 changed files with 289 additions and 2 deletions

View File

@@ -142,7 +142,8 @@ class StorletHandlerMiddleware(object):
if not is_success(orig_resp.status_int):
return orig_resp
if self._is_slo_get_request(req, orig_resp, account, \
if self._is_range_request(req) == True or \
self._is_slo_get_request(req, orig_resp, account, \
container, obj) or \
self.proxy_only_storlet_execution == True:
# For SLOs, and proxy only mode
@@ -207,7 +208,8 @@ class StorletHandlerMiddleware(object):
gateway.augmentStorletRequest(req)
original_resp = req.get_response(self.app)
if self._is_slo_get_request(req, original_resp, account, \
if self._is_range_request(req) == True or \
self._is_slo_get_request(req, original_resp, account, \
container, obj) or \
self.proxy_only_storlet_execution == True:
# SLO / proxy only case:
@@ -280,6 +282,16 @@ class StorletHandlerMiddleware(object):
return req.get_response(self.app)
'''
Determines whether the request is a byte-range request
args:
req: the request
'''
def _is_range_request(self, req):
if 'Range' in req.headers:
return True
return False
'''
Determines from a GET request and its associated response
if the object is a SLO

View File

@@ -0,0 +1,45 @@
<!--
Copyright IBM Corp. 2015, 2015 All Rights Reserved
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
Limitations under the License.
-->
<project>
<target name="clean">
<delete dir="bin" />
</target>
<target name="java">
<mkdir dir="bin" />
<javac srcdir="src" destdir="bin"
classpath="../../Engine/SCommon/bin/SCommon.jar"
includeantruntime="false" />
</target>
<target name="jar" depends="java">
<jar destfile="halfstorlet-1.0.jar" basedir="bin">
<manifest>
<attribute name="Main-Class"
value="com.ibm.storlet.HalfStorlet" />
</manifest>
</jar>
<move file="halfstorlet-1.0.jar" todir="bin" />
</target>
<target name="text" depends="jar">
<echo message="abcdefghijklmonp" file="bin/source.txt" />
</target>
<target name="build" depends="jar, text">
</target>
</project>

View File

@@ -0,0 +1,84 @@
/*----------------------------------------------------------------------------
* Copyright IBM Corp. 2015, 2015 All Rights Reserved
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* Limitations under the License.
* ---------------------------------------------------------------------------
*/
/*============================================================================
07-Jul-2015 cdoron Initial implementation.
===========================================================================*/
package com.ibm.storlet.half;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.io.InputStream;
import java.io.OutputStream;
import com.ibm.storlet.common.IStorlet;
import com.ibm.storlet.common.StorletException;
import com.ibm.storlet.common.StorletInputStream;
import com.ibm.storlet.common.StorletLogger;
import com.ibm.storlet.common.StorletObjectOutputStream;
import com.ibm.storlet.common.StorletOutputStream;
public class HalfStorlet implements IStorlet
{
@Override
public void invoke( ArrayList<StorletInputStream> inputStreams,
ArrayList<StorletOutputStream> outputStreams,
Map<String, String> parameters,
StorletLogger log )
throws StorletException {
log.emitLog("HalfStorlet Invoked");
StorletInputStream sis = inputStreams.get(0);
StorletObjectOutputStream storletObjectOutputStream;
storletObjectOutputStream = (StorletObjectOutputStream)outputStreams.get(0);
storletObjectOutputStream.setMetadata(sis.getMetadata());
/*
* Copy every other byte from input stream to output stream
*/
log.emitLog("Copying every other byte");
StorletInputStream psis = (StorletInputStream)inputStreams.get(0);
InputStream is;
is = psis.getStream();
OutputStream os = storletObjectOutputStream.getStream();
try {
log.emitLog(new Date().toString() + "About to read from input");
int a;
boolean bool = true;
while ( (a = is.read()) != -1 ) {
if (bool)
os.write(a);
bool = !bool;
}
} catch (Exception e) {
log.emitLog("Copying every other byte from input stream to output stream failed: " + e.getMessage());
throw new StorletException("Copying every other byte from input stream to output stream failed: " +
e.getMessage());
} finally {
try {
is.close();
os.close();
} catch (IOException e) { }
}
log.emitLog("HalfStorlet Invocation done");
}
}

View File

@@ -0,0 +1,144 @@
'''-------------------------------------------------------------------------
Copyright IBM Corp. 2015, 2015 All Rights Reserved
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
Limitations under the License.
-------------------------------------------------------------------------'''
import os
import json
import random
import string
from sys_test_params import *
from swiftclient import client as c
from storlets_test_utils import put_storlet_containers, put_storlet_object, \
progress, progress_ln, progress_msg
'''------------------------------------------------------------------------'''
# Test Constants
HALF_PATH_TO_BUNDLE ='../StorletSamples/HalfStorlet/bin/'
HALF_STORLET_NAME='halfstorlet-1.0.jar'
HALF_SOURCE_FILE = 'source.txt'
'''------------------------------------------------------------------------'''
def put_storlet_input_object(url, token):
resp = dict()
metadata = {'X-Object-Meta-Testkey':'tester'}
f = open('%s/%s' %(HALF_PATH_TO_BUNDLE, HALF_SOURCE_FILE),'r')
c.put_object(url, token, 'myobjects', HALF_SOURCE_FILE, f,
content_type = "application/octet-stream",
headers = metadata,
response_dict = resp)
f.close()
status = resp.get('status')
assert (status == 200 or status == 201)
'''------------------------------------------------------------------------'''
def deploy_storlet(url,token):
#No need to create containers every time
#put_storlet_containers(url, token)
put_storlet_object( url, token,
HALF_STORLET_NAME,
HALF_PATH_TO_BUNDLE,
'',
'com.ibm.storlet.half.HalfStorlet')
put_storlet_input_object( url, token )
'''------------------------------------------------------------------------'''
def invoke_storlet(url, token, op, params = None, global_params = None, headers = None):
if params != None:
querystring=''
for key in params:
querystring += '%s=%s,' % (key, params[key])
querystring = querystring[:-1]
else:
querystring = None
metadata = {'X-Run-Storlet': HALF_STORLET_NAME}
if headers:
metadata.update(headers)
if op == 'GET':
# Get original object
original_headers, original_content = c.get_object(url, token,
'myobjects',
HALF_SOURCE_FILE,
response_dict=dict())
#print original_headers
file_length = int(original_headers['content-length'])
processed_headers, returned_content = c.get_object(url, token,
'myobjects',
HALF_SOURCE_FILE,
query_string = querystring,
response_dict=dict(),
headers=metadata,
resp_chunk_size = file_length)
processed_content = ''
for chunk in returned_content:
if chunk:
processed_content+=chunk
assert(original_headers['X-Object-Meta-Testkey'.lower()] == processed_headers['X-Object-Meta-Testkey'.lower()])
return processed_content
if op == 'PUT':
# PUT a random file
response = dict()
uploaded_content = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(1024))
random_md = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(32))
#content_length = 1024
content_length = None
headers = {'X-Run-Storlet': HALF_STORLET_NAME,
'X-Object-Meta-Testkey' : random_md }
c.put_object(url, token, 'myobjects', 'half_random_source', uploaded_content,
content_length, None, None, "application/octet-stream",
headers, None, None, querystring, response)
resp_headers, saved_content = c.get_object(url, token,
'myobjects',
'half_random_source',
response_dict=dict())
if params != None and params.get('double',None) == 'true':
assert(uploaded_content==saved_content[:1024])
assert(uploaded_content==saved_content[1024:])
else:
assert(uploaded_content == saved_content)
if params != None and params.get('execute',None) != None:
assert(resp_headers['X-Object-Meta-Execution result'.lower()] == '42')
assert(resp_headers['X-Object-Meta-Testkey'.lower()] == random_md)
'''------------------------------------------------------------------------'''
def main():
os_options = {'tenant_name': ACCOUNT}
url, token = c.get_auth( 'http://' + AUTH_IP + ":"
+ AUTH_PORT + '/v2.0',
ACCOUNT + ':' + USER_NAME,
PASSWORD,
os_options = os_options,
auth_version = '2.0' )
print 'Deploying Half storlet and dependencies'
deploy_storlet(url, token)
print "Invoking Half storlet on GET"
assert (invoke_storlet(url, token,'GET') == 'acegikmn')
print "Invoking Half storlet on GET with byte ranges"
assert (invoke_storlet(url, token,'GET', headers = {'range': 'bytes=5-10'}) == 'fhj')
'''------------------------------------------------------------------------'''
if __name__ == "__main__":
main()

View File

@@ -164,6 +164,7 @@ def main():
os.system('python execdep_test.py')
os.system('python identity_storlet_test.py')
os.system('python half_storlet_test.py')
os.system('python metadata_storlet_test.py')
os.system('python SLO_test.py')

View File

@@ -71,6 +71,7 @@
<fileset dir="StorletSamples/ExecDepStorlet" includes="build.xml" />
<fileset dir="StorletSamples/IdentityStorlet" includes="build.xml" />
<fileset dir="StorletSamples/TestMetadataStorlet" includes="build.xml" />
<fileset dir="StorletSamples/HalfStorlet" includes="build.xml" />
</subant>
</sequential>
</macrodef>