This patch introduces the CSV storlet

The CSV storlet has an functional test, but was also
tested in the context of spark-storlets

Change-Id: I0442f6158b480aeecd8305ef09e8cc1171f45e5c
This commit is contained in:
Eran Rom
2016-09-18 22:44:27 +03:00
parent f9a7fb1611
commit 27c54b3ad0
17 changed files with 11071 additions and 0 deletions

View File

@@ -0,0 +1,45 @@
<!--
Copyright IBM Corp. 2015, 2015 All Rights Reserved
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
Limitations under the License.
-->
<project>
<target name="clean">
<delete dir="bin" />
</target>
<target name="java">
<mkdir dir="bin" />
<javac srcdir="src" destdir="bin"
classpath="../../../Engine/SCommon/bin/SCommon.jar"
includeantruntime="false" />
</target>
<target name="jar" depends="java">
<jar destfile="csvstorlet-1.0.jar" basedir="bin">
<manifest>
<attribute name="Main-Class"
value="org.apache.openstack.storlet.csv.CSVStorlet" />
</manifest>
</jar>
<move file="csvstorlet-1.0.jar" todir="bin" />
</target>
<target name="text" depends="jar">
<copy file="meter-1MB.csv" toFile="bin/meter-1MB.csv" />
</target>
<target name="build" depends="clean, jar, text">
</target>
</project>

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,354 @@
/*----------------------------------------------------------------------------
* Copyright IBM Corp. 2015, 2016 All Rights Reserved
* Copyright Universitat Rovira i Virgili. 2015, 2016 All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* Limitations under the License.
* ---------------------------------------------------------------------------
*/
package org.openstack.storlet.csv;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.Map;
import org.openstack.storlet.csv.clauses.Clause;
import org.openstack.storlet.csv.clauses.ClauseIf;
import org.openstack.storlet.common.IStorlet;
import org.openstack.storlet.common.StorletException;
import org.openstack.storlet.common.StorletInputStream;
import org.openstack.storlet.common.StorletLogger;
import org.openstack.storlet.common.StorletObjectOutputStream;
import org.openstack.storlet.common.StorletOutputStream;
/**
*
* @author Josep Sampe, Yosef Moatti
*
*/
public class CSVStorlet implements IStorlet, PushdownStorletConstants, SparkIndependentStorletSQLConstants{
/**
* Current version of the storlet supports either a single And clause or a single Or clause or a simple clause
* Where an "And" or "Or" clause is build out of 2 simple clauses
*
* In order to have good performance we hard coded the "And" and "Or" clauses thru the ands and Ors and ands/ors Part0/1 4 strings
*/
private ClauseIf theClause;
private void finalClose(InputStream is, OutputStream os, BufferedReader br) {
try {
if (is != null) is.close();
} catch (Exception ex) {
ex.printStackTrace();
}
try {
if (os != null) os.close();
} catch (Exception ex) {
ex.printStackTrace();
}
try {
if (br != null) br.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
public void invoke(ArrayList<StorletInputStream> inStreams, ArrayList<StorletOutputStream> outStreams,
Map<String, String> parameters, StorletLogger logger) throws StorletException {
Date theDate = Calendar.getInstance().getTime();
long startTime = theDate.getTime();
Utils.doubleLogPrint(logger,">>>>>>> Invoke starting .... at " + theDate);
ReaderEnv env = new ReaderEnv(parameters, logger);
StorletInputStream sis = inStreams.get(0);
InputStream inputStream = sis.getStream();
StorletObjectOutputStream storletObjectOutputStream = (StorletObjectOutputStream)outStreams.get(0);
OutputStream outputStream = storletObjectOutputStream.getStream();
int[] select;
String where_string;
BufferedReader br = null;
long processedBytes = 0;
long invalidCount = 0; // counts the number of rows which were found not valid:
long validCount = 0; // counts the number of rows which were found valid:
long probablyMalformedLines = 0; // counts the number of lines which cause an exception while being analyzed
long lineWithoutEnoughFields = 0; // counts the lines which are too short to be checked with predicate
String firstLine = "INITIAL_VALUE1";
String lastValidLine = "INITIAL_VALUE2";
String charset = DEFAULT_FILE_ENCRYPTION;
//GET PARAMETERS
long rangeBytesLeft = -2;
int maxPredicateIndex = -1;
final boolean columnSelectionActivated;
final String recordSeparator;
final byte[] recordSeparatorBytes;
int recordSeparatorBytesLen;
try {
select = env.getSelectedFields();
where_string = env.getTheWhereClause();
Utils.doubleLogPrint(logger,"where_string = " + where_string + " select = " + ((select == null) ? "null" : Arrays.toString(select)) );
if (where_string != null) {
where_string = where_string.trim();
// DEBUG only:
where_string = where_string.replaceAll("XYZYX", " ");
}
maxPredicateIndex = analysePredicate(logger, where_string);
columnSelectionActivated = select != null && select.length > 0;
recordSeparator = env.getParam(SWIFT_PUSHDOWN_STORLET_RECORD_DELIMITER, DEFAULT_RECORD_DELIMITER);
recordSeparatorBytes = recordSeparator.getBytes(charset);
recordSeparatorBytesLen = recordSeparatorBytes.length;
Utils.doubleLogPrint(logger,"columnSelectionActivated = " + columnSelectionActivated);
} catch (Exception ex) {
Utils.doubleLogPrint(logger,"Failure during input parsing" + ex.getMessage());
finalClose(inputStream, outputStream, null);
throw new StorletException(ex.getMessage());
}
try {
// Write Metadata
storletObjectOutputStream.setMetadata(sis.getMetadata());
} catch (Exception ex) {
Utils.doubleLogPrint(logger,"Failure during metadata wtiring" + ex.getMessage());
finalClose(inputStream, outputStream, null);
throw new StorletException(ex.getMessage());
}
try {
// Init Input stream Reader
br = new BufferedReader(new InputStreamReader(inputStream, charset));
} catch (Exception ex) {
Utils.doubleLogPrint(logger,"Failure during input stream reader init" + ex.getMessage());
finalClose(inputStream, outputStream, br);
throw new StorletException(ex.getMessage());
}
String line;
rangeBytesLeft = env.getEndRangePosition() - env.getStartRangePosition() + 1;
try {
if (env.getIsFirstPartition() == false) {
// Discard first (possibly broken) record:
line = br.readLine();
if (line == null) {
throw new StorletException("Stream fully consumed before reading first line");
}
int bytesRead = line.getBytes(charset).length + recordSeparatorBytesLen;
processedBytes += bytesRead;
rangeBytesLeft -= bytesRead;
Utils.doubleLogPrint(logger,"Range is prefixed, following first line (broken record) is discarded from processing " + line);
}
} catch (Exception ex) {
Utils.doubleLogPrint(logger,"Failure during first line consumption" + ex.getMessage());
finalClose(inputStream, outputStream, br);
throw new StorletException(ex.getMessage());
}
try {
// Consume rest of content
while ( ((line = br.readLine()) != null) && (rangeBytesLeft >= -1) ) {
final byte[] lineBytes = line.getBytes(charset);
int bytesRead = lineBytes.length + recordSeparatorBytesLen;
final String[] trace_line;
rangeBytesLeft -= bytesRead;
processedBytes += bytesRead;
if (columnSelectionActivated || (where_string != null && where_string.length() > 0) ) {
// if specific columns have been chosen, or if a predicate has to be evaluated
// we have to convert the line into a vector:
trace_line = line.split(",");
// After measure the performance of the Storlet I found that split function limits its performance.
// Split() is the most expensive operation in each iteration, maybe other technique (String tokenizer
// or other) is better here.
if (trace_line.length < maxPredicateIndex) {
lineWithoutEnoughFields++;
Utils.doubleLogPrint(logger,"Following line has not enough fields to be analysed with requested predicate! " + line);
continue;
}
} else {
trace_line = null;
}
try {
if ( where_string == null || where_string.length() == 0 || theClause.isValid(logger, trace_line)) {
final byte[] appendLine;
if (columnSelectionActivated) {
appendLine = chooseColumns(trace_line, select).getBytes(charset);
} else {
appendLine = lineBytes;
}
writeToOutputStream(logger, appendLine, outputStream, recordSeparatorBytes);
validCount++;
lastValidLine = line;
if (validCount == 1) {
firstLine = line;
}
} else {
invalidCount++;
}
} catch( ArrayIndexOutOfBoundsException e) {
Utils.doubleLogPrint(logger," Following line caused ArrayIndexOutOfBoundsException\n" + ">>>>" + line + "<<<<" + "\nstackTrace=\n" + getStackTraceString(e));
probablyMalformedLines++;
}
}
if (rangeBytesLeft > 0) {
Utils.doubleLogPrint(logger,"got a null line with more bytes ot read: " + String.valueOf(rangeBytesLeft));
}
if ( (line == null) && (rangeBytesLeft > 1024)) {
Utils.doubleLogPrint(logger, "Premature end of execution. line is null, however, rangeBytesLeft is " + rangeBytesLeft);
}
Utils.doubleLogPrint(logger, getCompletionMsg(startTime, null, invalidCount, validCount, probablyMalformedLines, lineWithoutEnoughFields, processedBytes, firstLine, lastValidLine, true));
} catch (UnsupportedEncodingException e) {
Utils.doubleLogPrint(logger,"raised UnsupportedEncodingException: " + e.getMessage());
throw new StorletException(e.getMessage());
} catch (IOException e) {
String msg = getCompletionMsg(startTime, e, invalidCount, validCount, probablyMalformedLines, lineWithoutEnoughFields, processedBytes, firstLine, lastValidLine, true);
Utils.doubleLogPrint(logger,"raised IOException: " + e.getMessage() + msg);
throw new StorletException(e.getMessage());
} catch (Exception e) {
String msg = getCompletionMsg(startTime, e, invalidCount, validCount, probablyMalformedLines, lineWithoutEnoughFields, processedBytes, firstLine, lastValidLine, true);
Utils.doubleLogPrint(logger,"raised Exception: " + e.getMessage() + msg);
throw new StorletException(e.getMessage());
} finally {
finalClose(inputStream, outputStream, br);
}
}
/**
* Generates a string which summarizes the most important info concerning this invocation
*
* @param startTime
* @param e
* @param invalidCount
* @param validCount
* @param probablyMalformedLines
* @param lineWithoutEnoughFields
* @param processedBytes
* @param firstLine
* @param lastValidLine
* @param shouldGoOn
* @return
*/
private String getCompletionMsg(long startTime, Exception e, long invalidCount, long validCount, long probablyMalformedLines, long lineWithoutEnoughFields, long processedBytes, String firstLine, String lastValidLine, boolean shouldGoOn) {
StringBuffer sb = new StringBuffer();
Date theDate = Calendar.getInstance().getTime();
long duration = theDate.getTime() - startTime;
sb.append(">>>> StartTime= ");
sb.append(startTime);
sb.append(" duration= ");
sb.append(duration);
sb.append(" endTime= ");
sb.append(theDate);
sb.append(" COUNTS invalid= ");
sb.append(invalidCount);
sb.append(" valid= ");
sb.append(validCount);
sb.append(" probablyMalformed=");
sb.append(probablyMalformedLines);
sb.append(" lineWithoutEnoughFields=");
sb.append(lineWithoutEnoughFields);
sb.append(" processedBytes = ");
sb.append(processedBytes);
sb.append("\nfirst line =");
sb.append(firstLine);
sb.append("\nlastValidLine =");
sb.append(lastValidLine);
sb.append(" shouldGoOn = ");
sb.append(shouldGoOn);
if (e != null) {
sb.append(" EXCEPTION occurred!!!! " + e);
}
return sb.toString();
}
/**
* @param logger
* @param predicateString
* @return the maximum index in use within predicate
*/
private int analysePredicate(StorletLogger logger, String predicateString) {
Utils.doubleLogPrint(logger," analysePredicate for predicateString = " + predicateString);
theClause = Clause.parseClause(logger, predicateString);
return theClause.getMaxCol();
}
/**
* Write the passed string followed by a record separator to the outputStream.
*
* @param logger
* @param theString
* @param outputStream
* @param recordSeparator
* @throws IOException
*/
private void writeToOutputStream(final StorletLogger logger, final byte[] output, final OutputStream outputStream,
final byte[] recordSeparator) throws IOException {
outputStream.write(output, 0, output.length);
outputStream.write(recordSeparator, 0, recordSeparator.length);
}
private String chooseColumns(String[] trace_line, int[] select) throws IOException {
StringBuffer sb = new StringBuffer();
final int maxIndex = trace_line.length - 1;
boolean first = true;
for(int index = 0; index < select.length; index++){
if (select[index] > maxIndex) {
continue; // the line is too short for selecting specified field
}
if (! first) {
sb.append(",");
} else {
first = false;
}
sb.append(trace_line[select[index]]);
}
return sb.toString();
}
private String getStackTraceString(Exception t) {
java.io.StringWriter sw2 = new java.io.StringWriter();
t.printStackTrace(new java.io.PrintWriter(sw2));
return sw2.toString();
}
}

View File

@@ -0,0 +1,60 @@
/*----------------------------------------------------------------------------
* Copyright IBM Corp. 2015, 2016 All Rights Reserved
* Copyright Universitat Rovira i Virgili. 2015, 2016 All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* Limitations under the License.
* ---------------------------------------------------------------------------
*/
package org.openstack.storlet.csv;
public interface PushdownStorletConstants {
public static final String SWIFT_PUSHDOWN_STORLET_NAME = "CSVStorlet-1.0.jar";
public static final String SWIFT_PUSHDOWN_STORLET_HEADER_NAME = "X-Run-Storlet";
public static final String SWIFT_PUSHDOWN_STORLET_PARAM_PREFIX = "X-Storlet-Parameter-";
public static final String SWIFT_PUSHDOWN_STORLET_QUERY_SEPARATOR = ";";
// separates parameter name and value in Spark string
public static final String SWIFT_PUSHDOWN_STORLET_QUERY_PARAM_EQUAL = "=";
// separates parameter name and value in storlet parameter
public static final String SWIFT_STORLET_QUERY_PARAM_EQUAL = ":";
public static final String SWIFT_PUSHDOWN_STORLET_RANGE_START = "start";
public static final String SWIFT_PUSHDOWN_STORLET_RANGE_END = "end";
public static final String SWIFT_PUSHDOWN_STORLET_MAX_RECORD_LINE = "max_record_line";
public static final String SWIFT_PUSHDOWN_STORLET_IS_FIRST_PARTITION = "first_partition";
public static final String SWIFT_PUSHDOWN_STORLET_SELECTED_COLUMNS = "selected_columns";
public static final String SWIFT_PUSHDOWN_STORLET_WHERE_CLAUSE = "where_clause";
public static final String SWIFT_PUSHDOWN_STORLET_RECORD_DELIMITER = "delimiter";
public static final String SWIFT_PUSHDOWN_STORLET_RECORD_COMMENT = "comment";
public static final String SWIFT_PUSHDOWN_STORLET_RECORD_QUOTE = "quote";
public static final String SWIFT_PUSHDOWN_STORLET_RECORD_ESCAPTE = "escape";
public static final String SWIFT_PUSHDOWN_STORLET_DYNAMIC_DEBUG =
"X-Storlet-DynamicDebug";
public static final String SWIFT_PUSHDOWN_STORLET_REQUESTED_RANGE_SEPARATOR = "_";
public static final String DEFAULT_RECORD_DELIMITER = "\n";
public static final String DEFAULT_TOKEN_DELIMITER = ",";
public static final String DEFAULT_PREDICATE = "";
public static final String DEFAULT_DYNAMIC_DEBUG = "0";
public static final String DEFAULT_COLUMNS = "";
public static final String DEFAULT_FILE_ENCRYPTION = "UTF-8";
public static final long DEFAULT_STREAM_BUFFER_LENGTH = 64 * 1024; // 64 K
public static final String COLUMNS_SEPARATOR = ","; // in fact used in upper packages
}

View File

@@ -0,0 +1,305 @@
/*----------------------------------------------------------------------------
* Copyright IBM Corp. 2015, 2016 All Rights Reserved
* Copyright Universitat Rovira i Virgili. 2015, 2016 All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* Limitations under the License.
* ---------------------------------------------------------------------------
*/
package org.openstack.storlet.csv;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.openstack.storlet.common.StorletException;
import org.openstack.storlet.common.StorletLogger;
/**
* @author moatti
*
*/
public class ReaderEnv implements PushdownStorletConstants, SparkIndependentStorletSQLConstants {
// General parameters names:
private static final String IS_STORLET_INVOCATION = "isStorletInvocation"; // False when run from within Eclipse
public static final String FIELD_PREFIX = "";
// Data member:
/**
* The list of the requested columns
* If empty, this means that all columns are requested
*
*/
private String[] selectedFields;
private int maxSelectedIndex = -1; // specifies the highest requested column index, stays as -1 if all columns requested
private String theWhereClause;
private Map<String, String> theParamMap;
private StorletLogger logger;
private long startRangePosition; // the file offset from which the range is defined
private long endRangePosition; // the end file position for which the range is defined
private int maxRecordLine; // the maximum length of a record
private boolean fistPartition; // is this the first partition
private long dynamicDebug; // a request dependent debug value which permits to force logging of certain events
/**
* Non Storlet invocation path
* @param parameters
* @throws StorletException
*/
static public ReaderEnv getReaderEnv(final String[] args, final StorletLogger logger) throws StorletException {
Map<String, String> theMap = buildParamMap(args, logger, false);
return new ReaderEnv(theMap, logger);
}
/**
* Storlet invocation path
* @param parameters
* @param log
* @param storletInvocation
* @throws StorletException
*/
public ReaderEnv(final Map<String, String> parameters, final StorletLogger logger) throws StorletException {
theParamMap = parameters;
this.logger = logger;
setDataMemberValues(theParamMap);
}
/**
* @param log
* @param isRealStorletInvocation
* @param theParamMap
* @return a Map of the options
*/
static private Map<String, String> buildParamMap(String[] args, StorletLogger log, boolean isRealStorletInvocation) {
final Map<String, String> theMap = new HashMap<String, String>();
// Parse the passed parameters defining the behavior of the CSV parser:
for (String nextArg : args) {
if (nextArg.contains("=")) {
String[] arr = nextArg.split("=");
if (arr.length != 2) {
Utils.doubleLogPrint(log,"Seems we got a bad formatted parameter! " + nextArg);
} else {
theMap.put(arr[0].trim(), arr[1]);
}
}
}
theMap.put(IS_STORLET_INVOCATION, Boolean.toString(isRealStorletInvocation));
return theMap;
}
private void setDataMemberValues(final Map<String, String> theMap) throws StorletException {
// List of selected fields: (permits to requests columns by field header name)
{
String theFields = Utils.getParam(theMap, SWIFT_PUSHDOWN_STORLET_SELECTED_COLUMNS, DEFAULT_COLUMNS, logger, false);
String[] requestedFieldsStrings = null;
if (theFields != null && theFields.length()>0) {
requestedFieldsStrings = theFields.split(COLUMNS_SEPARATOR);
Utils.doubleLogPrint(logger," Requested fields: " + Arrays.toString(requestedFieldsStrings));
}
if (theFields != null && theFields.length()>0) {
if (requestedFieldsStrings != null && requestedFieldsStrings.length > 0) {
int nextIndex = 0;
selectedFields = new String[requestedFieldsStrings.length];
for (String nextStringNumber : requestedFieldsStrings) {
try {
int nextParsedNumber = Integer.parseInt(nextStringNumber);
selectedFields[nextIndex++] = getFieldName(nextParsedNumber);
if (nextParsedNumber > maxSelectedIndex) {
maxSelectedIndex = nextParsedNumber;
}
} catch (NumberFormatException nfe) {
throw new StorletException("The argument specifying the selected fields " + theFields + " contains a column index which could not be parsed as an integer: " + nextStringNumber);
}
}
Utils.doubleLogPrint(logger," max selected index 1 is " + maxSelectedIndex);
}
}
}
// The dynamic debug level:
{
String dynamicDebugStr = Utils.getParam(theMap, SWIFT_PUSHDOWN_STORLET_DYNAMIC_DEBUG, DEFAULT_DYNAMIC_DEBUG, logger, false);
try {
Utils.doubleLogPrint(logger, "dynamicDebugStr string = " + dynamicDebugStr);
dynamicDebug = Long.parseLong(dynamicDebugStr);
} catch (NumberFormatException e) {
dynamicDebug = 0;
}
}
// The WHERE clause:
{
theWhereClause = Utils.getParam(theMap, SWIFT_PUSHDOWN_STORLET_WHERE_CLAUSE, DEFAULT_PREDICATE, logger, false);
Utils.doubleLogPrint(logger, "theWhereClause = " + theWhereClause);
}
// The requested range:
{
startRangePosition = getLongParam(SWIFT_PUSHDOWN_STORLET_RANGE_START, -1);
if (startRangePosition == -1) {
throw new StorletException("Start position is either missing or malformed");
}
endRangePosition = getLongParam(SWIFT_PUSHDOWN_STORLET_RANGE_END, -1);
if (endRangePosition == -1) {
throw new StorletException("End position is either missing or malformed");
}
}
// Max record line
{
maxRecordLine = getIntParam(SWIFT_PUSHDOWN_STORLET_MAX_RECORD_LINE, -1);
if (maxRecordLine == -1) {
throw new StorletException("Max Record Line is either missing or malformed");
}
}
// First Partition
{
String fistPartitionString = Utils.getParam(theParamMap, SWIFT_PUSHDOWN_STORLET_IS_FIRST_PARTITION, "", logger);
if (fistPartitionString.equals("")) {
throw new StorletException("First partition is missing");
}
try {
fistPartition = Boolean.parseBoolean(fistPartitionString);
} catch (Exception ex) {
throw new StorletException("First partition is malformed");
}
}
}
private String getFieldName(int colIndex) {
// returns the concatenation of default selected column prefix and the requested index
return FIELD_PREFIX + colIndex;
}
public String getTheWhereClause() {
return theWhereClause;
}
public Map<String, String> getTheParams() {
return theParamMap;
}
/**
* @param key
* @return associated value
* @throws RuntimeException if the value is missing!
*/
public String getParam(String key) {
return getParam(key, null);
}
/**
* @param key
* @param defaultValue
* @return
*/
public String getParam(String key, String defaultValue) {
return Utils.getParam(theParamMap, key, defaultValue, logger);
}
/**
* @param key
* @param defaultValue
* @return
*/
public long getLongParam(String key, long defaultValue) {
String valString = Utils.getParam(theParamMap, key, Long.toString(defaultValue), logger);
if (valString == null)
return defaultValue;
else {
long retVal = defaultValue;
try {
retVal = Long.parseLong(valString);
} catch (NumberFormatException nfe) {
// NOP
}
return retVal;
}
}
/**
* @param key
* @param defaultValue
* @return
*/
public int getIntParam(String key, int defaultValue) {
String valString = Utils.getParam(theParamMap, key, Integer.toString(defaultValue), logger);
if (valString == null)
return defaultValue;
else {
int retVal = defaultValue;
try {
retVal = Integer.parseInt(valString);
} catch (NumberFormatException nfe) {
// NOP
}
return retVal;
}
}
public int[] getSelectedFields() {
if (selectedFields == null) {
return null;
}
if ( selectedFields.length == 0) {
return null;
}
if (selectedFields[0].equals("*")) {
return null;
}
int[] arr = new int[selectedFields.length];
int index = 0;
for (String nextCol : selectedFields) {
arr[index++] = Integer.parseInt(nextCol);
}
return arr;
}
public int getMaxRecordLine() {
return maxRecordLine;
}
public boolean getIsFirstPartition() {
return fistPartition;
}
public long getStartRangePosition() {
return startRangePosition;
}
public long getEndRangePosition() {
return endRangePosition;
}
public long getDynamicDebug() {
return dynamicDebug;
}
}

View File

@@ -0,0 +1,33 @@
/*----------------------------------------------------------------------------
* Copyright IBM Corp. 2015, 2016 All Rights Reserved
* Copyright Universitat Rovira i Virgili. 2015, 2016 All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* Limitations under the License.
* ---------------------------------------------------------------------------
*/
package org.openstack.storlet.csv;
public interface SparkIndependentStorletSQLConstants {
/*
* Following constants are used to build the predicate corresponding to the WHERE clause of
* the SQL query
*/
public static final String AND = "And";
public static final String OR = "Or";
public static final String EQUAL = "EqualTo";
public static final String NOT_EQUAL = "NotEqualTo";
public static final String STARTS_WITH = "StringStartsWith";
public static final String ENDS_WITH = "StringEndsWith";
public static final String CONTAINS = "StringContains";
}

View File

@@ -0,0 +1,95 @@
/*----------------------------------------------------------------------------
* Copyright IBM Corp. 2015, 2016 All Rights Reserved
* Copyright Universitat Rovira i Virgili. 2015, 2016 All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* Limitations under the License.
* ---------------------------------------------------------------------------
*/
package org.openstack.storlet.csv;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Map;
import org.openstack.storlet.common.StorletLogger;
/**
* @author moatti
*
*/
public class Utils {
public static String getParam(final Map<String, String> paramMap, final String paramName, final String defaultValue, final StorletLogger log) {
return getParam(paramMap, paramName, defaultValue, log, false);
}
/**
* @param paramMap the Map of the parameters: associates parameter names to parameter values
* @param paramName the name of the parameter
* @param defaultValue if null no default value and an exception will be thrown if the parameter value is missing in paramMap
* @param isChar true if the parameter is a char
* @return the String value of the parameter
*/
public static String getParam(final Map<String, String> paramMap, final String paramName, final String defaultValue, final StorletLogger logger, final boolean isChar) {
String val = paramMap.get(paramName);
if (val == null || val.length() == 0) {
if (defaultValue == null) {
throw new RuntimeException("Missing value for parameter " + paramName);
}
val = defaultValue;
} else if (isChar && val.length()>1) {
throw new RuntimeException(val + " : the value given for parameter " + paramName + " should not have more than a single character!");
}
doubleLogPrint(logger, " value for " + paramName + " is:" + val);
return val;
}
public static boolean getBoolean(Map<String, String> parameters, String paramName, boolean paramDefaultValue, final StorletLogger log) {
String paramValue = getParam(parameters, paramName, (new Boolean(paramDefaultValue)).toString(), log, Boolean.FALSE);
return Boolean.parseBoolean(paramValue);
}
public static void doubleLogPrint(StorletLogger logger, String printString) {
if (logger != null)
logger.emitLog(printString);
System.out.println(printString);
}
public static String extractParam(final String[] args, final String searchedKey, final String missingTxt) {
for (String nextArg :args) {
if (nextArg != null && nextArg.startsWith(searchedKey)) {
String[] tokens = nextArg.split("=");
if (tokens.length > 1) {
return tokens[1];
} else {
throw new RuntimeException(missingTxt + ": key was found but without value!: " + nextArg);
}
}
}
return null;
}
public static String getStackTrace(Exception e) {
if (e == null)
return null;
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw));
return sw.toString();
}
}

View File

@@ -0,0 +1,70 @@
/*----------------------------------------------------------------------------
* Copyright IBM Corp. 2015, 2016 All Rights Reserved
* Copyright Universitat Rovira i Virgili. 2015, 2016 All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* Limitations under the License.
* ---------------------------------------------------------------------------
*/
package org.openstack.storlet.csv.clauses;
import java.util.Arrays;
import org.openstack.storlet.common.StorletLogger;
/**
* @author moatti
*
*/
public class AndClause extends Clause {
public AndClause(StorletLogger logger, String[] highLevelClauses) {
super(logger);
if (highLevelClauses.length < 2)
throw new RuntimeException("AndClause necessitates at least 2 clauses! " + Arrays.toString(highLevelClauses));
for (String nextClauseStr : highLevelClauses) {
ClauseIf nextClause = parseClause(logger, nextClauseStr);
addChild(nextClause);
}
}
/* (non-Javadoc)
* @see org.openstack.storlet.csv.ClauseIf#isLeaf()
*/
@Override
public boolean isLeaf() {
return false;
}
/* (non-Javadoc)
* @see org.openstack.storlet.csv.Clause#toString()
*/
@Override
public String toString() {
return "AND of:\n" + super.toString() + "----- AND completed -------\n";
}
/* (non-Javadoc)
* @see org.openstack.storlet.csv.ClauseIf#isValid()
*/
@Override
public boolean isValid(StorletLogger logger, String[] trace_line) {
for (ClauseIf next : getChildren()) {
if (!next.isValid(logger, trace_line)) {
return false;
}
}
return true;
}
}

View File

@@ -0,0 +1,248 @@
/*----------------------------------------------------------------------------
* Copyright IBM Corp. 2015, 2016 All Rights Reserved
* Copyright Universitat Rovira i Virgili. 2015, 2016 All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* Limitations under the License.
* ---------------------------------------------------------------------------
*/
package org.openstack.storlet.csv.clauses;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.openstack.storlet.csv.Utils;
import org.openstack.storlet.common.StorletLogger;
/**
* @author moatti
*
*/
public abstract class Clause implements ClauseIf {
private static final int UNINITIALIZED = -12345;
final static private char COMMA = ',';
final static private String COMMA_STRING = ",";
final static private String COMMA_REPLACE_STRING = "HJ3Y5XMC"; // a random rare string which obviously does not contains a comma
private static final Set<Character> COMMA_INHIBITORS_START = new HashSet<Character>();
private static final Set<Character> COMMA_INHIBITORS_END = new HashSet<Character>();
private int maxCol = UNINITIALIZED;
private List<ClauseIf> children = null;
protected StorletLogger logger;
static {
COMMA_INHIBITORS_START.add('(');
COMMA_INHIBITORS_START.add('[');
COMMA_INHIBITORS_END.add(')');
COMMA_INHIBITORS_END.add(']');
}
public Clause(StorletLogger logger) {
this.logger = logger;
}
protected void addChild(ClauseIf nextChild) {
if (children == null)
children = new ArrayList<ClauseIf>();
children.add(nextChild);
}
@Override
public List<ClauseIf> getChildren() {
return children;
}
/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
for (ClauseIf nextChild : this.getChildren()) {
sb.append(nextChild.toString());
sb.append("\n");
}
return sb.toString();
}
/* (non-Javadoc)
* @see org.openstack.storlet.csv.ClauseIf#getMaxCol()
*/
@Override
public int getMaxCol() {
if (maxCol == UNINITIALIZED) {
maxCol = 0;
for (ClauseIf nextChild : this.getChildren()) {
int newVal = nextChild.getMaxCol();
if (newVal > maxCol)
maxCol = newVal;
}
}
return maxCol;
}
static public ClauseIf parseClause(StorletLogger logger, String clauseStr) {
// Assumption1: clauseStr appears as a list of high Level Clauses separated by the space character
// Assumption2: within high level clauses, AND is marked by: And(subClause1,subClause2)
// Assumption3: within high level clauses, OR is marked by: Or(subClause1,subClause2)
// Here are some examples:
// sqlContext.sql("select count(CaseID) from data where (Status like 'Closed' and CaseID like '%1%' and CaseID like '%3%')")
// whereClause = EqualTo(2,"Closed")XYZYXStringContains(0,"1")XYZYXStringContains(0,"3")
// where (Status like 'Open' or CaseID like '%1%' or CaseID like '%3%' or CaseID like '%5%')")
// whereClause = Or(Or(Or(EqualTo(2,Open),StringContains(0,1)),StringContains(0,3)),StringContains(0,5))
// where ((Status like 'Open' or CaseID like '%1%') and CaseID like '%3%')")
// whereClause = Or(EqualTo(2,Open),StringContains(0,1)) StringContains(0,3)
// where ((Status like 'Closed' and CaseID like '%1%') or (CaseID like '%3%' and CaseID like '%4%' ) )")
// WhereClause = Or(And(EqualTo(2,Closed),StringContains(0,1)),And(StringContains(0,3),StringContains(0,4)))
// where ((Status like 'Closed' or CaseID like '%1%') and (CaseID like '%3%' or CaseID like '%4%' ) )")
// WhereClause = Or(EqualTo(2,Closed),StringContains(0,1)) Or(StringContains(0,3),StringContains(0,4))
// where ((Status like 'Closed' or CaseID like '%1%') and (CaseID like '%3%' or CaseID like '%4%' ) and (CaseID like '%5%' and CaseID like '%6%' ) )")
// WhereClause = Or(EqualTo(2,Closed),StringContains(0,1)) Or(StringContains(0,3),StringContains(0,4)) StringContains(0,5) StringContains(0,6)
// where ((Status like 'Closed' and CaseID like '%1%') or (CaseID like '%3%' and CaseID like '%4%' ) )")
// WhereClause = Or(And(EqualTo(2,Closed),StringContains(0,1)),And(StringContains(0,3),StringContains(0,4)))
// where ((Status like 'Closed' and CaseID like '%1%' and CaseID like '%2%') or (CaseID like '%3%' and CaseID like '%4%' ) )")
// WhereClause = Or(And(And(EqualTo(2,Closed),StringContains(0,1)),StringContains(0,2)),And(StringContains(0,3),StringContains(0,4)))
if (clauseStr == null) {
Utils.doubleLogPrint(logger,"WARNING null clause string passed to parseClause!");
return new EmptyClause();
}
clauseStr = clauseStr.trim();
if (clauseStr.length() == 0) {
Utils.doubleLogPrint(logger,"WARNING trimmed clause string passed to parseClause is empty!");
return new EmptyClause();
}
ClauseIf retClause;
// First we find out the high level clauses separated by space characters.
// If we find out more than 2 high level clauses, we will issue an AndClause:
String[] highLevelAndClauses = clauseStr.split(HIGH_LEVEL_AND_SEPARATOR);
Utils.doubleLogPrint(logger, highLevelAndClauses.length + " high level clauses found for " + clauseStr);
if (highLevelAndClauses.length > 1) {
retClause = new AndClause(logger, highLevelAndClauses);
} else {
String clauseString = highLevelAndClauses[0];
LogicalOperator op = LogicalOperator.NONE;
if (clauseString.startsWith(LogicalOperator.OR.getOpLabel())) {
op = LogicalOperator.OR;
} else if (clauseString.startsWith(LogicalOperator.AND.getOpLabel())) {
op = LogicalOperator.AND;
}
// First, remove if needed the opString and the parenthesis:
if (op != LogicalOperator.NONE) {
String whereStringTmp = clauseString.substring(op.getOpLabel().length()+1, clauseString.length()-1);
whereStringTmp = replaceNonRelevantCommas(whereStringTmp);
Utils.doubleLogPrint(logger,"Parsing whereStringOr = " + whereStringTmp);
String[] clauseParts = whereStringTmp.split(",");
clauseParts = replaceBackCommas(logger, clauseParts);
Utils.doubleLogPrint(logger,"The clause parts are = " + Arrays.toString(clauseParts));
if (op == LogicalOperator.AND) {
retClause = new AndClause(logger, clauseParts);
} else { // op == LogicalOperator.OR
retClause = new OrClause(logger, clauseParts);
}
} else {
String[] orParts = new String[1];
orParts[0] = clauseString;
retClause = new LeafClause(logger, clauseString);
}
}
Utils.doubleLogPrint(logger,"parseClause for " + clauseStr + " returns\n" + retClause);
return retClause;
}
/**
* This method will replace all COMMA that are within () with COMMA_REPLACE_STRING
* @param whereStringOr
* @return
*/
static private String replaceNonRelevantCommas(String whereStringOr) {
if (whereStringOr == null || whereStringOr.length() == 0)
return whereStringOr;
char[] charArray = whereStringOr.toCharArray();
boolean[] inhibited = new boolean[charArray.length];
int encounteredInhibitors = 0;
for (int index=0; index<charArray.length; index++) {
char theChar = charArray[index];
if (COMMA_INHIBITORS_START.contains(theChar)) {
encounteredInhibitors++;
} else if (COMMA_INHIBITORS_END.contains(theChar)) {
encounteredInhibitors--;
}
inhibited[index] = encounteredInhibitors>0;
}
StringBuffer sb = new StringBuffer();
// We now replace all commas for the non inhibited chars:
for (int index=0; index<charArray.length; index++) {
char theChar = whereStringOr.charAt(index);
if (inhibited[index]) {
if (theChar == COMMA) {
sb.append(COMMA_REPLACE_STRING);
} else {
sb.append(theChar);
}
} else { // not inhibited, we leave theChar as is:
sb.append(theChar);
}
}
return sb.toString();
}
static private String[] replaceBackCommas(StorletLogger logger, String[] orParts) {
String[] rets = new String[orParts.length];
int index = 0;
for (String next : orParts) {
if (next != null)
rets[index++] = next.replaceAll(COMMA_REPLACE_STRING, COMMA_STRING);
else {
Utils.doubleLogPrint(logger,"WARNING orParts[" + index + "] is null");
index++;
}
}
return rets;
}
}

View File

@@ -0,0 +1,55 @@
/*----------------------------------------------------------------------------
* Copyright IBM Corp. 2015, 2016 All Rights Reserved
* Copyright Universitat Rovira i Virgili. 2015, 2016 All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* Limitations under the License.
* ---------------------------------------------------------------------------
*/
package org.openstack.storlet.csv.clauses;
import java.util.List;
import org.openstack.storlet.common.StorletLogger;
/**
* @author moatti
*
*/
public interface ClauseIf {
/**
* This string separates high level clauses where the parent clause is considered as an AND clause
*/
public final static String HIGH_LEVEL_AND_SEPARATOR = " ";
/**
* @return true iff this is a leaf Clause without any child
*/
public boolean isLeaf();
/**
* @return the list of the clause children
*/
public List<ClauseIf> getChildren();
/**
* @return the logical evaluation of this for argument trace_line
*/
public boolean isValid(StorletLogger logger, String[] trace_line);
/**
* @return the maximum column index addressed within this ClauseIf object
*/
public int getMaxCol();
}

View File

@@ -0,0 +1,76 @@
/*----------------------------------------------------------------------------
* Copyright IBM Corp. 2015, 2016 All Rights Reserved
* Copyright Universitat Rovira i Virgili. 2015, 2016 All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* Limitations under the License.
* ---------------------------------------------------------------------------
*/
package org.openstack.storlet.csv.clauses;
import java.util.List;
import java.util.Vector;
import org.openstack.storlet.common.StorletLogger;
/**
* @author moatti
*
*/
public class EmptyClause implements ClauseIf {
private Vector<ClauseIf> EMPTY_LIST = new Vector<ClauseIf>();
private EmptyClause singleton = null;
public EmptyClause() {
if (singleton == null) {
singleton = new EmptyClause(" ");
}
}
private EmptyClause(String str) {
singleton = this;
}
/* (non-Javadoc)
* @see org.openstack.storlet.csv.ClauseIf#isLeaf()
*/
@Override
public boolean isLeaf() {
return true;
}
/* (non-Javadoc)
* @see org.openstack.storlet.csv.ClauseIf#getChildren()
*/
@Override
public List<ClauseIf> getChildren() {
return EMPTY_LIST;
}
/* (non-Javadoc)
* @see org.openstack.storlet.csv.ClauseIf#isValid()
*/
@Override
public boolean isValid(StorletLogger logger, String[] trace_line) {
return true;
}
/* (non-Javadoc)
* @see org.openstack.storlet.csv.ClauseIf#getMaxCol()
*/
@Override
public int getMaxCol() {
return 0;
}
}

View File

@@ -0,0 +1,111 @@
/*----------------------------------------------------------------------------
* Copyright IBM Corp. 2015, 2016 All Rights Reserved
* Copyright Universitat Rovira i Virgili. 2015, 2016 All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* Limitations under the License.
* ---------------------------------------------------------------------------
*/
package org.openstack.storlet.csv.clauses;
import java.util.Arrays;
import org.openstack.storlet.csv.Utils;
import org.openstack.storlet.common.StorletLogger;
/**
* @author moatti
*
*/
public class LeafClause extends Clause {
private LeafOperator op; // the Operator of the Clause
private int clauseColumnIndex; // its column index of the clause
private String clauseOperand; // its clause operand
public LeafClause(StorletLogger logger, String clauseStr) {
super(logger);
parse(clauseStr);
Utils.doubleLogPrint(logger,"LeafClause constructor: " + op.getLeafOpLabel() + " for column " + clauseColumnIndex + " and with operand " + clauseOperand);
}
@Override
public String toString() {
return "Clause: op = " + op.getLeafOpLabel() + " column # =" + clauseColumnIndex + " Operand = " + clauseOperand;
}
public boolean isValidClause(StorletLogger logger, String[] trace_line) {
String lineOperand;
try {
lineOperand = trace_line[clauseColumnIndex];
} catch( ArrayIndexOutOfBoundsException e) {
Utils.doubleLogPrint(logger, "ArrayIndexOutOfBoundsException occurred, for trace_line = " +
( ( trace_line == null) ? " null " : Arrays.toString(trace_line) ) + " and clause = " + this);
throw e;
}
return (op.isValid(clauseOperand, lineOperand));
}
public int getMaxCol() {
return clauseColumnIndex;
}
void parse(String clauseStr) {
Utils.doubleLogPrint(logger,"LeafClause.parse for " + clauseStr);
LeafOperator theOp;
if (clauseStr.startsWith(LeafOperator.EQUAL.getLeafOpLabel())) {
theOp = LeafOperator.EQUAL;
} else if (clauseStr.startsWith(LeafOperator.NOT_EQUAL.getLeafOpLabel())) {
theOp = LeafOperator.NOT_EQUAL;
} else if (clauseStr.startsWith(LeafOperator.STARTS_WITH.getLeafOpLabel())) {
theOp = LeafOperator.STARTS_WITH;
} else if (clauseStr.startsWith(LeafOperator.ENDS_WITH.getLeafOpLabel())) {
theOp = LeafOperator.ENDS_WITH;
} else if (clauseStr.startsWith(LeafOperator.CONTAINS.getLeafOpLabel())) {
theOp = LeafOperator.CONTAINS;
} else {
throw new RuntimeException("Unexpected clause operator " + clauseStr);
}
String[] ops = clauseStr.substring(theOp.getLeafOpLabel().length()).split(",");
int clauseIndex;
try {
clauseIndex = Integer.parseInt(ops[0].substring(1)); // remove the "("
} catch (NumberFormatException e) {
Utils.doubleLogPrint(logger,"parseClause for " + clauseStr + " encountered a NumberFormatException when trying to convert " + ops[0].substring(1) + " to int");
throw e;
}
clauseOperand = ops[1].substring(0, ops[1].length()-1); // remove the ")"
op = theOp;
clauseColumnIndex = clauseIndex;
}
@Override
public boolean isLeaf() {
return true;
}
@Override
public boolean isValid(StorletLogger logger, String[] trace_line) {
String lineOperand;
try {
lineOperand = trace_line[clauseColumnIndex];
} catch( ArrayIndexOutOfBoundsException e) {
Utils.doubleLogPrint(logger, "ArrayIndexOutOfBoundsException occurred, for trace_line = " +
( ( trace_line == null) ? " null " : Arrays.toString(trace_line) ) + " and clause = " + this);
throw e;
}
return (op.isValid(clauseOperand, lineOperand));
}
}

View File

@@ -0,0 +1,96 @@
/*----------------------------------------------------------------------------
* Copyright IBM Corp. 2015, 2016 All Rights Reserved
* Copyright Universitat Rovira i Virgili. 2015, 2016 All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* Limitations under the License.
* ---------------------------------------------------------------------------
*/
package org.openstack.storlet.csv.clauses;
public enum LeafOperator {
EQUAL("EqualTo") {
@Override public boolean isValid(String clauseOp, String lineVal) {
if (clauseOp == null)
return lineVal == null || lineVal.trim().length() == 0;
return clauseOp.equals(lineVal);
}
},
NOT_EQUAL("NotEqualTo"){
@Override public boolean isValid(String clauseOp, String lineVal) {
if (clauseOp == null)
return lineVal != null && lineVal.trim().length() > 0;
return !clauseOp.equals(lineVal);
}
},
STARTS_WITH("StringStartsWith"){
@Override public boolean isValid(String clauseOp, String lineVal) {
if (clauseOp == null || clauseOp.length() == 0)
return true;
if (lineVal == null || lineVal.length() == 0)
return false;
return lineVal.startsWith(clauseOp);
}
@Override public String transform(String operand) {
if (operand == null || !operand.endsWith("%"))
return operand;
else
return operand.substring(0, operand.length()-1);
}
},
ENDS_WITH("StringEndsWith") {
@Override public boolean isValid(String clauseOp, String lineVal) {
if (clauseOp == null || clauseOp.length() == 0)
return true;
if (lineVal == null || lineVal.length() == 0)
return false;
return lineVal.endsWith(clauseOp);
}
@Override public String transform(String operand) {
if (operand == null || !operand.startsWith("%"))
return operand;
else
return operand.substring(1, operand.length());
}
},
CONTAINS("StringContains") {
@Override public boolean isValid(String clauseOp, String lineVal) {
if (clauseOp == null || clauseOp.length() == 0)
return true;
if (lineVal == null || lineVal.length() == 0)
return false;
return lineVal.contains(clauseOp);
}
@Override public String transform(String operand) {
if (operand == null || !operand.startsWith("%") || !operand.endsWith("%"))
return operand;
else
return operand.substring(1, operand.length()-1);
}
};
private final String opLabel;
LeafOperator(String label) {
this.opLabel = label;
}
public String getLeafOpLabel() {
return opLabel;
}
public abstract boolean isValid(String clauseOp, String lineVal);
public String transform(String operand) {
return operand;
}
}

View File

@@ -0,0 +1,35 @@
/*----------------------------------------------------------------------------
* Copyright IBM Corp. 2015, 2016 All Rights Reserved
* Copyright Universitat Rovira i Virgili. 2015, 2016 All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* Limitations under the License.
* ---------------------------------------------------------------------------
*/
package org.openstack.storlet.csv.clauses;
public enum LogicalOperator {
AND("And"),
OR("Or"),
NONE("")
;
private final String opLabel;
LogicalOperator(String label) {
this.opLabel = label;
}
public String getOpLabel() {
return opLabel;
}
}

View File

@@ -0,0 +1,70 @@
/*----------------------------------------------------------------------------
* Copyright IBM Corp. 2015, 2016 All Rights Reserved
* Copyright Universitat Rovira i Virgili. 2015, 2016 All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* Limitations under the License.
* ---------------------------------------------------------------------------
*/
package org.openstack.storlet.csv.clauses;
import java.util.Arrays;
import org.openstack.storlet.common.StorletLogger;
/**
* @author moatti
*
*/
public class OrClause extends Clause {
public OrClause(StorletLogger logger, String[] childrenClauses) {
super(logger);
if (childrenClauses.length < 2)
throw new RuntimeException("OrClause necessitates at least 2 clauses! " + Arrays.toString(childrenClauses));
for (String nextClauseStr : childrenClauses) {
ClauseIf nextClause = parseClause(logger, nextClauseStr);
addChild(nextClause);
}
}
/* (non-Javadoc)
* @see org.openstack.storlet.csv.ClauseIf#isLeaf()
*/
@Override
public boolean isLeaf() {
return false;
}
/* (non-Javadoc)
* @see org.openstack.storlet.csv.Clause#toString()
*/
@Override
public String toString() {
return "OR of:\n" + super.toString() + "----- OR completed -------\n";
}
/* (non-Javadoc)
* @see org.openstack.storlet.csv.ClauseIf#isValid()
*/
@Override
public boolean isValid(StorletLogger logger, String[] trace_line) {
for (ClauseIf next : getChildren()) {
if (next.isValid(logger, trace_line)) {
return true;
}
}
return false;
}
}

View File

@@ -27,6 +27,7 @@
<fileset dir="HalfStorlet" includes="build.xml" />
<fileset dir="CompressStorlet" includes="build.xml" />
<fileset dir="ThumbnailStorlet" includes="build.xml" />
<fileset dir="CsvStorlet" includes="build.xml" />
</subant>
</sequential>
</macrodef>

View File

@@ -0,0 +1,111 @@
# Copyright (c) 2010-2016 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from swiftclient import client as c
from tests.functional.java import StorletJavaFunctionalTest
class TestCsvStorlet(StorletJavaFunctionalTest):
def setUp(self):
self.storlet_log = 'csvstorlet-1.0.log'
self.additional_headers = {}
main_class = 'org.openstack.storlet.csv.CSVStorlet'
super(TestCsvStorlet, self).setUp('CsvStorlet',
'csvstorlet-1.0.jar',
main_class,
'myobjects',
'meter-1MB.csv')
def invoke_storlet(self, start, end,
first_partition, max_record_line,
columns_selection,
where_clause):
headers = {'X-Run-Storlet': self.storlet_name}
headers.update(self.additional_headers)
headers['X-Storlet-Range'] = 'bytes=%d-%d' % (start,
end + max_record_line)
headers['X-Storlet-Parameter-1'] = '%s:%s' % ('start', start)
headers['X-Storlet-Parameter-2'] = '%s:%s' % ('end', end)
headers['X-Storlet-Parameter-3'] = '%s:%s' % ('max_record_line',
max_record_line)
headers['X-Storlet-Parameter-4'] = '%s:%s' % ('first_partition',
first_partition)
headers['X-Storlet-Parameter-5'] = '%s:%s' % ('selected_columns',
columns_selection)
headers['X-Storlet-Parameter-6'] = '%s:%s' % ('where_clause',
where_clause)
_, content = c.get_object(
self.url, self.token, self.container, self.storlet_file,
response_dict=dict(), headers=headers)
return content
def _test_filter(self):
content = self.invoke_storlet(120, 1024,
'false', 512,
'4,6', '')
for line in content.split('\n'):
if line:
self.assertEqual(len(line.split(',')), 2)
def _test_prune(self):
content = self.invoke_storlet(120, 1024,
'false', 512,
'',
'EqualTo(6,ESP)')
for line in content.split('\n'):
if line:
val = line.split(',')[6]
self.assertEqual(val, 'ESP')
def _test_prune_filter(self):
content = self.invoke_storlet(120, 1024,
'false', 512,
'4,6',
'EqualTo(6,ESP)')
for line in content.split('\n'):
if line:
val = line.split(',')[1]
self.assertEqual(val, 'ESP')
def _prune_filter_scan_with_count(self, start, stop,
first, max_len,
columns, where):
content = self.invoke_storlet(start, stop,
first, max_len,
columns, where)
count = 0
for line in content.split('\n'):
if line:
self.assertEqual(line.split(',')[1], 'FRA')
count = count + 1
return count
def test_complete_file(self):
max_len = 512
c1 = self._prune_filter_scan_with_count(58, 349557,
'true', max_len,
'4,6', 'EqualTo(6,FRA)')
c2 = self._prune_filter_scan_with_count(349558, 699057,
'false', max_len,
'4,6', 'EqualTo(6,FRA)')
c3 = self._prune_filter_scan_with_count(699058, 1048558,
'false', max_len,
'4,6', 'EqualTo(6,FRA)')
self.assertEqual(c1 + c2 + c3, 1070)
class TestCsvStorletOnProxy(TestCsvStorlet):
def setUp(self):
super(TestCsvStorletOnProxy, self).setUp()
self.additional_headers = {'X-Storlet-Run-On-Proxy': ''}