Add group by multiple dimensions

Added influx group by for multiple dimensions
Added vertica group by for multiple dimensions
Added tempest tests for group by with one, multiple, and all dimensions

Change-Id: I69c27198ab180e17b7636bbd0f26fc1bd5292f3b
This commit is contained in:
Ryan Brandt 2016-11-02 09:48:56 -06:00
parent d49e04515f
commit ed49d1062a
20 changed files with 601 additions and 138 deletions

View File

@ -193,11 +193,12 @@ public final class Validation {
}
}
public static void validateMetricsGroupBy(String groupBy) {
public static List<String> parseAndValidateMetricsGroupBy(String groupBy) {
if (!Strings.isNullOrEmpty(groupBy) && !"*".equals(groupBy)) {
throw Exceptions.unprocessableEntity("Invalid group_by", "Group_by must be '*' if specified");
if (!Strings.isNullOrEmpty(groupBy)) {
return COMMA_SPLITTER.splitToList(groupBy);
}
return new ArrayList<>();
}
public static void validateLifecycleState(String lifecycleState) {

View File

@ -28,6 +28,6 @@ public interface MeasurementRepo {
*/
List<Measurements> find(String tenantId, String name, Map<String, String> dimensions,
DateTime startTime, @Nullable DateTime endTime, @Nullable String offset,
int limit, Boolean mergeMetricsFlag, String groupBy)
int limit, Boolean mergeMetricsFlag, List<String> groupBy)
throws Exception;
}

View File

@ -23,7 +23,7 @@ import monasca.common.model.domain.common.AbstractEntity;
/**
* Encapsulates a metric measurements.
*/
public class Measurements extends AbstractEntity {
public class Measurements extends AbstractEntity implements Comparable<Measurements> {
private static final List<String> COLUMNS = Arrays.asList("timestamp", "value", "value_meta");
protected String name;
@ -131,4 +131,9 @@ public class Measurements extends AbstractEntity {
return String.format("Measurement [name=%s, dimensions=%s, measurements=%s]", name, dimensions,
measurements);
}
@Override
public int compareTo(Measurements other) {
return this.id.compareTo(other.getId());
}
}

View File

@ -31,6 +31,6 @@ public interface StatisticRepo {
List<Statistics> find(String tenantId, String name, Map<String, String> dimensions,
DateTime startTime, @Nullable DateTime endTime, List<String> statistics,
int period, String offset, int limit, Boolean mergeMetricsFlag,
String groupBy)
List<String> groupBy)
throws Exception;
}

View File

@ -13,6 +13,7 @@
*/
package monasca.api.infrastructure.persistence.influxdb;
import com.google.common.base.Joiner;
import com.google.inject.Inject;
import com.fasterxml.jackson.core.type.TypeReference;
@ -42,6 +43,7 @@ public class InfluxV9MeasurementRepo implements MeasurementRepo {
.getLogger(InfluxV9MeasurementRepo.class);
private final static TypeReference VALUE_META_TYPE = new TypeReference<Map<String, String>>() {};
private final static Joiner COMMA_JOINER = Joiner.on(',');
private final ApiConfig config;
private final String region;
@ -69,7 +71,7 @@ public class InfluxV9MeasurementRepo implements MeasurementRepo {
public List<Measurements> find(String tenantId, String name, Map<String, String> dimensions,
DateTime startTime, @Nullable DateTime endTime,
@Nullable String offset, int limit, Boolean mergeMetricsFlag,
String groupBy)
List<String> groupBy)
throws Exception {
String q = buildQuery(tenantId, name, dimensions, startTime, endTime,
@ -79,7 +81,7 @@ public class InfluxV9MeasurementRepo implements MeasurementRepo {
Series series = this.objectMapper.readValue(r, Series.class);
List<Measurements> measurementsList = measurementsList(series, offset, limit);
List<Measurements> measurementsList = measurementsList(series, groupBy, offset, limit);
logger.debug("Found {} metrics matching query", measurementsList.size());
@ -88,41 +90,52 @@ public class InfluxV9MeasurementRepo implements MeasurementRepo {
private String buildQuery(String tenantId, String name, Map<String, String> dimensions,
DateTime startTime, DateTime endTime, String offset, int limit,
Boolean mergeMetricsFlag, String groupBy) throws Exception {
Boolean mergeMetricsFlag, List<String> groupBy) throws Exception {
String q;
String groupByStr = "";
if ("*".equals(groupBy)) {
if (!groupBy.isEmpty()) {
groupByStr = " group by * ";
// The time column is automatically included in the results before all other columns.
q = String.format("select value, value_meta %1$s "
+ "where %2$s %3$s %4$s %5$s %6$s %7$s", //slimit 1
this.influxV9Utils.namePart(name, true),
this.influxV9Utils.privateTenantIdPart(tenantId),
this.influxV9Utils.privateRegionPart(this.region),
this.influxV9Utils.startTimePart(startTime),
this.influxV9Utils.dimPart(dimensions),
this.influxV9Utils.endTimePart(endTime),
this.influxV9Utils.groupByPart(groupBy));
} else {
if (Boolean.FALSE.equals(mergeMetricsFlag)) {
if (!this.influxV9MetricDefinitionRepo.isAtMostOneSeries(tenantId, name, dimensions)) {
throw new MultipleMetricsException(name, dimensions);
}
groupByStr = this.influxV9Utils.groupByPart();
String groupByStr = "";
if (Boolean.FALSE.equals(mergeMetricsFlag) &&
!this.influxV9MetricDefinitionRepo.isAtMostOneSeries(tenantId, name, dimensions)) {
throw new MultipleMetricsException(name, dimensions);
} else if (Boolean.FALSE.equals(mergeMetricsFlag)) {
groupByStr = " group by * ";
}
}
// The time column is automatically included in the results before all other columns.
q = String.format("select value, value_meta %1$s "
+ "where %2$s %3$s %4$s %5$s %6$s %7$s",
this.influxV9Utils.namePart(name, true),
this.influxV9Utils.privateTenantIdPart(tenantId),
this.influxV9Utils.privateRegionPart(this.region),
this.influxV9Utils.startTimePart(startTime),
this.influxV9Utils.dimPart(dimensions),
this.influxV9Utils.endTimePart(endTime),
groupByStr);
// The time column is automatically included in the results before all other columns.
q = String.format("select value, value_meta %1$s "
+ "where %2$s %3$s %4$s %5$s %6$s %7$s",
this.influxV9Utils.namePart(name, true),
this.influxV9Utils.privateTenantIdPart(tenantId),
this.influxV9Utils.privateRegionPart(this.region),
this.influxV9Utils.startTimePart(startTime),
this.influxV9Utils.dimPart(dimensions),
this.influxV9Utils.endTimePart(endTime),
groupByStr);
}
logger.debug("Measurements query: {}", q);
return q;
}
private List<Measurements> measurementsList(Series series, String offsetStr, int limit) {
private List<Measurements> measurementsList(Series series, List<String> groupBy, String offsetStr, int limit) {
List<Measurements> measurementsList = new LinkedList<>();
if (!series.isEmpty()) {
@ -149,10 +162,31 @@ public class InfluxV9MeasurementRepo implements MeasurementRepo {
continue;
}
Measurements measurements =
new Measurements(serie.getName(),
influxV9Utils.filterPrivateTags(serie.getTags()));
measurements.setId(Integer.toString(index));
Measurements lastMeasurements = null;
Measurements measurements = null;
if (!groupBy.isEmpty()) {
Map<String, String> dimensions = influxV9Utils.filterGroupByTags(
influxV9Utils.filterPrivateTags(serie.getTags()),
groupBy);
lastMeasurements = measurementsList.size() > 0 ?
measurementsList.get(measurementsList.size() - 1) : null;
if (lastMeasurements != null && lastMeasurements.getDimensions().equals(dimensions))
measurements = measurementsList.get(measurementsList.size() - 1);
}
if (measurements == null){
measurements = new Measurements(serie.getName(),
influxV9Utils.filterPrivateTags(serie.getTags()));
measurements.setId(Integer.toString(index));
}
for (String[] values : serie.getValues()) {
if (remaining_limit <= 0) {
@ -168,7 +202,7 @@ public class InfluxV9MeasurementRepo implements MeasurementRepo {
}
}
if (measurements.getMeasurements().size() > 0) {
if (measurements != lastMeasurements && measurements.getMeasurements().size() > 0) {
measurementsList.add(measurements);
}
index++;

View File

@ -279,14 +279,13 @@ public class InfluxV9MetricDefinitionRepo implements MetricDefinitionRepo {
{
String q = String.format("select value, value_meta %1$s "
+ "where %2$s %3$s %4$s %5$s %6$s %7$s slimit 1",
+ "where %2$s %3$s %4$s %5$s %6$s group by * slimit 1",
this.influxV9Utils.namePart(name, true),
this.influxV9Utils.privateTenantIdPart(tenantId),
this.influxV9Utils.privateRegionPart(this.region),
this.influxV9Utils.startTimePart(startTime),
this.influxV9Utils.dimPart(dimensions),
this.influxV9Utils.endTimePart(endTime),
this.influxV9Utils.groupByPart());
this.influxV9Utils.endTimePart(endTime));
logger.debug("Measurements query: {}", q);

View File

@ -72,7 +72,7 @@ public class InfluxV9StatisticRepo implements StatisticRepo {
public List<Statistics> find(String tenantId, String name, Map<String, String> dimensions,
DateTime startTime, @Nullable DateTime endTime,
List<String> statistics, int period, String offset, int limit,
Boolean mergeMetricsFlag, String groupBy) throws Exception {
Boolean mergeMetricsFlag, List<String> groupBy) throws Exception {
String offsetTimePart = "";
if (!Strings.isNullOrEmpty(offset)) {
@ -105,7 +105,7 @@ public class InfluxV9StatisticRepo implements StatisticRepo {
private String buildQuery(String tenantId, String name, Map<String, String> dimensions,
DateTime startTime, DateTime endTime, List<String> statistics,
int period, String offset, int limit, Boolean mergeMetricsFlag,
String groupBy)
List<String> groupBy)
throws Exception {
String offsetTimePart = "";
@ -116,27 +116,28 @@ public class InfluxV9StatisticRepo implements StatisticRepo {
String q;
if ("*".equals(groupBy) ) {
if (!groupBy.isEmpty()) {
q = String.format("select %1$s %2$s "
+ "where %3$s %4$s %5$s %6$s %7$s %8$s",
funcPart(statistics),
this.influxV9Utils.namePart(name, true),
this.influxV9Utils.privateTenantIdPart(tenantId),
this.influxV9Utils.privateRegionPart(this.region),
this.influxV9Utils.startTimePart(startTime),
this.influxV9Utils.dimPart(dimensions),
this.influxV9Utils.endTimePart(endTime),
this.influxV9Utils.periodPartWithGroupBy(period));
+ "where %3$s %4$s %5$s %6$s %7$s %8$s %9$s %10$s",
funcPart(statistics),
this.influxV9Utils.namePart(name, true),
this.influxV9Utils.privateTenantIdPart(tenantId),
this.influxV9Utils.privateRegionPart(this.region),
this.influxV9Utils.startTimePart(startTime),
this.influxV9Utils.dimPart(dimensions),
this.influxV9Utils.endTimePart(endTime),
this.influxV9Utils.timeOffsetPart(offsetTimePart),
this.influxV9Utils.periodPartWithGroupBy(period, groupBy),
this.influxV9Utils.limitPart(limit));
} else {
if (Boolean.FALSE.equals(mergeMetricsFlag) &&
!this.influxV9MetricDefinitionRepo.isAtMostOneSeries(tenantId, name, dimensions)) {
if (Boolean.FALSE.equals(mergeMetricsFlag) &&
!this.influxV9MetricDefinitionRepo.isAtMostOneSeries(tenantId, name, dimensions)) {
throw new MultipleMetricsException(name, dimensions);
throw new MultipleMetricsException(name, dimensions);
}
}
q = String.format("select %1$s %2$s "
+ "where %3$s %4$s %5$s %6$s %7$s %8$s %9$s %10$s",

View File

@ -13,6 +13,7 @@
*/
package monasca.api.infrastructure.persistence.influxdb;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
@ -34,6 +35,7 @@ public class InfluxV9Utils {
static final String OFFSET_SEPARATOR = "_";
static final Splitter
offsetSplitter = Splitter.on(OFFSET_SEPARATOR).omitEmptyStrings().trimResults();
static final Joiner COMMA_JOINER = Joiner.on(',');
public InfluxV9Utils() {
}
@ -85,9 +87,11 @@ public class InfluxV9Utils {
return sb.toString();
}
public String groupByPart() {
public String groupByPart(List<String> groupBy) {
return " group by *";
if (!groupBy.isEmpty() && !groupBy.contains("*"))
return " group by " + COMMA_JOINER.join(groupBy) + ' ';
return "group by * ";
}
@ -253,10 +257,14 @@ public class InfluxV9Utils {
}
public String periodPartWithGroupBy(int period) {
public String periodPartWithGroupBy(int period, List<String> groupBy) {
if (period <= 0) {
period = 300;
}
return period > 0 ? String.format(" group by time(%1$ds), *", period)
: " group by time(300s), *";
String periodStr = ",time(" + period + "s)";
return String.format(" group by %1$s%2$s", COMMA_JOINER.join(groupBy), periodStr);
}
public String periodPart(int period, Boolean mergeMetricsFlag) {
@ -277,6 +285,17 @@ public class InfluxV9Utils {
return filteredMap;
}
Map<String, String> filterGroupByTags(Map<String, String> tagMap, List<String> groupBy) {
Map<String, String> filteredMap = new HashMap<>(tagMap);
for (String key : tagMap.keySet()) {
if (!groupBy.contains(key))
filteredMap.remove(key);
}
return filteredMap;
}
public String threeDigitMillisTimestamp(String origTimestamp) {
final int length = origTimestamp.length();
final String timestamp;

View File

@ -24,10 +24,12 @@ import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Named;
@ -50,9 +52,11 @@ public class MeasurementVerticaRepoImpl implements MeasurementRepo {
ISODateTimeFormat.dateTime().withZoneUTC();
private static final String FIND_BY_METRIC_DEF_SQL =
"SELECT %s to_hex(mes.definition_dimensions_id) as def_dims_id, "
"SELECT %s " // db hint to satisfy query
+ "%s to_hex(mes.definition_dimensions_id) as def_dims_id, " // select for groupBy if present
+ "mes.time_stamp, mes.value, mes.value_meta "
+ "FROM MonMetrics.Measurements mes "
+ "%s" // joins for group by
+ "WHERE mes.time_stamp >= :startTime "
+ "%s " // endtime and offset here
+ "AND TO_HEX(definition_dimensions_id) IN (%s) " // id subquery here
@ -86,34 +90,42 @@ public class MeasurementVerticaRepoImpl implements MeasurementRepo {
@Nullable String offset,
int limit,
Boolean mergeMetricsFlag,
String groupBy) throws MultipleMetricsException {
List<String> groupBy) throws MultipleMetricsException {
try (Handle h = db.open()) {
Map<String, Measurements> results = new HashMap<>();
if (!"*".equals(groupBy) && !Boolean.TRUE.equals(mergeMetricsFlag)) {
if (groupBy.isEmpty() && !Boolean.TRUE.equals(mergeMetricsFlag)) {
MetricQueries.checkForMultipleDefinitions(h, tenantId, name, dimensions);
}
StringBuilder sb = new StringBuilder();
StringBuilder endtimeAndOffsetSql = new StringBuilder();
if (endTime != null) {
sb.append(" and mes.time_stamp <= :endTime");
endtimeAndOffsetSql.append(" and mes.time_stamp <= :endTime");
}
String concatGroupByString = MetricQueries.buildGroupByConcatString(groupBy);
if (offset != null && !offset.isEmpty()) {
if ("*".equals(groupBy)) {
if (!groupBy.isEmpty() && groupBy.contains("*")) {
sb.append(" and (TO_HEX(mes.definition_dimensions_id) > :offset_id "
endtimeAndOffsetSql.append(" and (TO_HEX(mes.definition_dimensions_id) > :offset_id "
+ "or (TO_HEX(mes.definition_dimensions_id) = :offset_id and mes.time_stamp > :offset_timestamp)) ");
} else if (!groupBy.isEmpty()){
endtimeAndOffsetSql.append(" AND (").append(concatGroupByString)
.append(" > :offset_id OR (").append(concatGroupByString)
.append(" = :offset_id AND mes.time_stamp > :offset_timestamp)) ");
} else {
sb.append(" and mes.time_stamp > :offset_timestamp ");
endtimeAndOffsetSql.append(" and mes.time_stamp > :offset_timestamp ");
}
@ -122,17 +134,27 @@ public class MeasurementVerticaRepoImpl implements MeasurementRepo {
String orderById = "";
if (Boolean.FALSE.equals(mergeMetricsFlag)) {
orderById = "mes.definition_dimensions_id,";
if (!groupBy.isEmpty() && !groupBy.contains("*")) {
orderById += MetricQueries.buildGroupByCommaString(groupBy) + ',';
}
if (orderById.isEmpty())
orderById += "mes.definition_dimensions_id,";
}
String groupBySelect = concatGroupByString;
if (!groupBySelect.isEmpty())
groupBySelect += " as dimension_values, ";
String sql = String.format(FIND_BY_METRIC_DEF_SQL,
this.dbHint,
sb,
MetricQueries.buildMetricDefinitionSubSql(name, dimensions,
null, null),
orderById);
String sql = String.format(
FIND_BY_METRIC_DEF_SQL,
this.dbHint,
groupBySelect,
MetricQueries.buildGroupBySql(groupBy),
endtimeAndOffsetSql,
MetricQueries.buildMetricDefinitionSubSql(name, dimensions, null, null),
orderById);
logger.debug(sql);
Query<Map<String, Object>> query = h.createQuery(sql)
.bind("tenantId", tenantId)
@ -152,6 +174,12 @@ public class MeasurementVerticaRepoImpl implements MeasurementRepo {
}
if (!groupBy.isEmpty() && !groupBy.contains("*")) {
logger.debug("binding groupBy: {}", groupBy);
MetricQueries.bindGroupBy(query, groupBy);
}
if (offset != null && !offset.isEmpty()) {
logger.debug("binding offset: {}", offset);
@ -165,7 +193,7 @@ public class MeasurementVerticaRepoImpl implements MeasurementRepo {
return new ArrayList<>();
}
if ("*".equals(groupBy)) {
if (!groupBy.isEmpty() && groupBy.contains("*")) {
String currentDefId = null;
@ -186,6 +214,31 @@ public class MeasurementVerticaRepoImpl implements MeasurementRepo {
MetricQueries.addDefsToResults(results, h, this.dbHint);
} else if (!groupBy.isEmpty()) {
String currentId = null;
for (Map<String, Object> row : rows) {
String dimensionValues = (String) row.get("dimension_values");
if (dimensionValues != null && !dimensionValues.equals(currentId)) {
currentId = dimensionValues;
Measurements tmp = new Measurements();
tmp.setId(dimensionValues);
tmp.setName(name);
tmp.setDimensions(MetricQueries.combineGroupByAndValues(groupBy, dimensionValues));
results.put(dimensionValues, tmp);
}
List<Object> measurement = parseRow(row);
results.get(dimensionValues).addMeasurement(measurement);
}
} else {
Measurements firstMeasurement = new Measurements();
@ -216,7 +269,10 @@ public class MeasurementVerticaRepoImpl implements MeasurementRepo {
}
return new ArrayList<>(results.values());
List<Measurements> returnValue = new ArrayList<>(results.values());
Collections.sort(returnValue);
return returnValue;
}
}

View File

@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.Query;
@ -35,8 +36,8 @@ import monasca.api.domain.model.measurement.Measurements;
*/
final class MetricQueries {
private static final Splitter BAR_SPLITTER = Splitter.on('|').omitEmptyStrings().trimResults();
private static final char OFFSET_SEPARATOR = '_';
private static final Splitter offsetSplitter = Splitter.on(OFFSET_SEPARATOR).omitEmptyStrings().trimResults();
private static final Splitter UNDERSCORE_SPLITTER = Splitter.on('_').omitEmptyStrings().trimResults();
private static final Splitter COMMA_SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();
static final String FIND_METRIC_DEFS_SQL =
"SELECT %s TO_HEX(defDims.id) as defDimsId, def.name, dims.name as dName, dims.value AS dValue "
@ -189,7 +190,7 @@ final class MetricQueries {
}
static void bindOffsetToQuery(Query<Map<String, Object>> query, String offset) {
List<String> offsets = offsetSplitter.splitToList(offset);
List<String> offsets = UNDERSCORE_SPLITTER.splitToList(offset);
if (offsets.size() > 1) {
query.bind("offset_id", offsets.get(0));
query.bind("offset_timestamp",
@ -291,4 +292,65 @@ final class MetricQueries {
}
}
static Map<String, String> combineGroupByAndValues(List<String> groupBy, String valueStr) {
List<String> values = COMMA_SPLITTER.splitToList(valueStr);
Map<String, String> newDimensions = new HashMap<>();
for (int i = 0; i < groupBy.size(); i++) {
newDimensions.put(groupBy.get(i), values.get(i));
}
return newDimensions;
}
static String buildGroupByConcatString(List<String> groupBy) {
if (groupBy.isEmpty() || "*".equals(groupBy.get(0)))
return "";
String select = "(";
for (int i = 0; i < groupBy.size(); i++) {
if (i > 0)
select += " || ',' || ";
select += "gb" + i + ".value";
}
select += ")";
return select;
}
static String buildGroupByCommaString(List<String> groupBy) {
String result = "";
if (!groupBy.contains("*")) {
for (int i = 0; i < groupBy.size(); i++) {
if (i > 0) {
result += ',';
}
result += "gb" + i + ".value";
}
}
return result;
}
static String buildGroupBySql(List<String> groupBy) {
if (groupBy.isEmpty() || "*".equals(groupBy.get(0)))
return "";
StringBuilder groupBySql = new StringBuilder(
" JOIN MonMetrics.DefinitionDimensions as dd on dd.id = mes.definition_dimensions_id ");
for (int i = 0; i < groupBy.size(); i++) {
groupBySql.append("JOIN (SELECT dimension_set_id,value FROM MonMetrics.Dimensions WHERE name = ");
groupBySql.append(":groupBy").append(i).append(") as gb").append(i);
groupBySql.append(" ON gb").append(i).append(".dimension_set_id = dd.dimension_set_id ");
}
return groupBySql.toString();
}
static void bindGroupBy(Query<Map<String, Object>> query, List<String> groupBy) {
int i = 0;
for (String value: groupBy) {
query.bind("groupBy" + i, value);
i++;
}
}
}

View File

@ -68,7 +68,7 @@ public class StatisticVerticaRepoImpl implements StatisticRepo {
String offset,
int limit,
Boolean mergeMetricsFlag,
String groupBy) throws MultipleMetricsException {
List<String> groupBy) throws MultipleMetricsException {
Map<String, Statistics> statisticsMap = new HashMap<>();
@ -77,14 +77,14 @@ public class StatisticVerticaRepoImpl implements StatisticRepo {
try (Handle h = db.open()) {
if (!"*".equals(groupBy) && !Boolean.TRUE.equals(mergeMetricsFlag)) {
if (groupBy.isEmpty() && !Boolean.TRUE.equals(mergeMetricsFlag)) {
MetricQueries.checkForMultipleDefinitions(h, tenantId, name, dimensions);
}
String sql = createQuery(name, dimensions, period, startTime, endTime, offset,
statisticsCols, mergeMetricsFlag);
statisticsCols, mergeMetricsFlag, groupBy);
logger.debug("vertica sql: {}", sql);
@ -102,6 +102,10 @@ public class StatisticVerticaRepoImpl implements StatisticRepo {
MetricQueries.bindDimensionsToQuery(query, dimensions);
if (!groupBy.isEmpty()) {
MetricQueries.bindGroupBy(query, groupBy);
}
if (offset != null && !offset.isEmpty()) {
logger.debug("binding offset: {}", offset);
@ -114,7 +118,7 @@ public class StatisticVerticaRepoImpl implements StatisticRepo {
return new ArrayList<>();
}
if ("*".equals(groupBy)) {
if (!groupBy.isEmpty() && groupBy.contains("*")) {
String currentDefId = null;
@ -138,6 +142,31 @@ public class StatisticVerticaRepoImpl implements StatisticRepo {
MetricQueries.addDefsToResults(statisticsMap, h, this.dbHint);
} else if (!groupBy.isEmpty()) {
String currentId = null;
for (Map<String, Object> row : rows) {
String dimensionValues = (String) row.get("dimension_values");
if (dimensionValues != null && !dimensionValues.equals(currentId)) {
currentId = dimensionValues;
Statistics tmp = new Statistics();
tmp.setId(dimensionValues);
tmp.setName(name);
tmp.setDimensions(MetricQueries.combineGroupByAndValues(groupBy, dimensionValues));
statisticsMap.put(dimensionValues, tmp);
}
List<Object> statisticsRow = parseRow(row);
statisticsMap.get(dimensionValues).addMeasurement(statisticsRow);
}
} else {
Statistics statistics = new Statistics();
@ -173,7 +202,11 @@ public class StatisticVerticaRepoImpl implements StatisticRepo {
}
return new ArrayList<>(statisticsMap.values());
List<Statistics> results = new ArrayList<>(statisticsMap.values());
Collections.sort(results);
return results;
}
private List<Object> parseRow(Map<String, Object> row) {
@ -235,11 +268,18 @@ public class StatisticVerticaRepoImpl implements StatisticRepo {
DateTime endTime,
String offset,
List<String> statistics,
Boolean mergeMetricsFlag) {
Boolean mergeMetricsFlag,
List<String> groupBy) {
StringBuilder sb = new StringBuilder();
sb.append("SELECT ").append(this.dbHint).append(" ");
if (!groupBy.isEmpty() && !groupBy.contains("*")) {
sb.append(MetricQueries.buildGroupByConcatString(groupBy));
sb.append(" as dimension_values, ");
}
sb.append(" max(to_hex(definition_dimensions_id)) AS id, ");
sb.append(createColumnsStr(statistics));
@ -248,21 +288,44 @@ public class StatisticVerticaRepoImpl implements StatisticRepo {
sb.append(", 'SECOND', 'START') AS time_interval");
}
sb.append(" FROM MonMetrics.Measurements ");
sb.append(" FROM MonMetrics.Measurements as mes ");
if (!groupBy.isEmpty() && !groupBy.contains("*")) {
sb.append(MetricQueries.buildGroupBySql(groupBy));
}
sb.append("WHERE TO_HEX(definition_dimensions_id) IN (")
.append(MetricQueries.buildMetricDefinitionSubSql(name, dimensions, null, null))
.append(") ");
sb.append(createWhereClause(startTime, endTime, offset, mergeMetricsFlag));
sb.append(createWhereClause(startTime, endTime, offset, groupBy));
if (period >= 1) {
sb.append(" group by ");
if (Boolean.FALSE.equals(mergeMetricsFlag)) {
if (!groupBy.isEmpty() && groupBy.contains("*")) {
sb.append("definition_dimensions_id, ");
} else if (!groupBy.isEmpty()) {
for (int i = 0; i < groupBy.size(); i++) {
sb.append("gb").append(i).append(".value,");
}
}
sb.append("time_interval ");
sb.append(" order by ");
if (Boolean.FALSE.equals(mergeMetricsFlag)) {
if (!groupBy.isEmpty() && groupBy.contains("*")) {
sb.append("to_hex(definition_dimensions_id),");
} else {
sb.append(MetricQueries.buildGroupByCommaString(groupBy));
if (!groupBy.isEmpty())
sb.append(',');
}
sb.append("time_interval ");
}
@ -276,7 +339,7 @@ public class StatisticVerticaRepoImpl implements StatisticRepo {
DateTime startTime,
DateTime endTime,
String offset,
Boolean mergeMetricsFlag) {
List<String> groupBy) {
String s = "";
@ -288,9 +351,16 @@ public class StatisticVerticaRepoImpl implements StatisticRepo {
if (offset != null && !offset.isEmpty()) {
if (Boolean.FALSE.equals(mergeMetricsFlag)) {
if (!groupBy.isEmpty()) {
s += " AND (TO_HEX(definition_dimensions_id) > :offset_id "
+ "OR (TO_HEX(definition_dimensions_id) = :offset_id AND time_stamp > :offset_timestamp)) ";
} else if (!groupBy.isEmpty()){
String concatGroupByString = MetricQueries.buildGroupByConcatString(groupBy);
s += " AND (" + concatGroupByString + " > :offset_id" +
" OR (" + concatGroupByString + " = :offset_id AND mes.time_stamp > :offset_timestamp)) ";
} else {
s += " AND time_stamp > :offset_timestamp ";
}
@ -307,7 +377,7 @@ public class StatisticVerticaRepoImpl implements StatisticRepo {
for (String statistic : statistics) {
sb.append(statistic + "(value) as " + statistic + ", ");
sb.append(statistic + "(mes.value) as " + statistic + ", ");
}
return sb.toString();

View File

@ -74,7 +74,7 @@ public class MeasurementResource {
@QueryParam("limit") String limit,
@QueryParam("tenant_id") String crossTenantId,
@QueryParam("merge_metrics") String mergeMetricsFlag,
@QueryParam("group_by") String groupBy) throws Exception {
@QueryParam("group_by") String groupByStr) throws Exception {
// Validate query parameters
DateTime startTime = Validation.parseAndValidateDate(startTimeStr, "start_time", true);
@ -86,7 +86,7 @@ public class MeasurementResource {
.parseAndValidateDimensions(dimensionsStr);
MetricNameValidation.validate(name, true);
Boolean mergeMetricsFlagBool = Validation.validateAndParseMergeMetricsFlag(mergeMetricsFlag);
Validation.validateMetricsGroupBy(groupBy);
List<String> groupBy = Validation.parseAndValidateMetricsGroupBy(groupByStr);
String queryTenantId = Validation.getQueryProject(roles, crossTenantId, tenantId, admin_role);

View File

@ -80,7 +80,7 @@ public class StatisticResource {
@QueryParam("limit") String limit,
@QueryParam("tenant_id") String crossTenantId,
@QueryParam("merge_metrics") String mergeMetricsFlag,
@QueryParam("group_by") String groupBy) throws Exception {
@QueryParam("group_by") String groupByStr) throws Exception {
// Validate query parameters
Validation.validateNotNullOrEmpty(name, "name");
@ -96,7 +96,7 @@ public class StatisticResource {
.parseAndValidateDimensions(dimensionsStr);
MetricNameValidation.validate(name, true);
Boolean mergeMetricsFlagBool = Validation.validateAndParseMergeMetricsFlag(mergeMetricsFlag);
Validation.validateMetricsGroupBy(groupBy);
List<String> groupBy = Validation.parseAndValidateMetricsGroupBy(groupByStr);
String queryTenantId = Validation.getQueryProject(roles, crossTenantId, tenantId, admin_role);

View File

@ -58,7 +58,7 @@ public class StatisticResourceTest extends AbstractMonApiResourceTest {
.header("X-Tenant-Id", "abc").get(ClientResponse.class);
verify(statisticRepo).find(anyString(), anyString(), any(Map.class), any(DateTime.class),
any(DateTime.class), any(List.class), anyInt(), any(String.class), anyInt(),
anyBoolean(), anyString());
anyBoolean(), any(List.class));
}
public void queryShouldThrowOnInvalidDateFormat() throws Exception {

View File

@ -343,6 +343,12 @@ class MetricsRepository(metrics_repository.AbstractMetricsRepository):
if not result:
return json_measurement_list
offset_id = 0
if offset is not None:
offset_tuple = offset.split('_')
offset_id = int(offset_tuple[0]) if len(offset_tuple) > 1 else 0
index = offset_id
for serie in result.raw['series']:
if 'values' in serie:
@ -357,7 +363,7 @@ class MetricsRepository(metrics_repository.AbstractMetricsRepository):
value_meta])
measurement = {u'name': serie['name'],
u'id': measurements_list[-1][0],
u'id': str(index),
u'columns': [u'timestamp', u'value',
u'value_meta'],
u'measurements': measurements_list}
@ -369,6 +375,7 @@ class MetricsRepository(metrics_repository.AbstractMetricsRepository):
if not key.startswith('_')}
json_measurement_list.append(measurement)
index += 1
return json_measurement_list
@ -439,6 +446,12 @@ class MetricsRepository(metrics_repository.AbstractMetricsRepository):
if not result:
return json_statistics_list
offset_id = 0
if offset is not None:
offset_tuple = offset.split('_')
offset_id = int(offset_tuple[0]) if len(offset_tuple) > 1 else 0
index = offset_id
for serie in result.raw['series']:
if 'values' in serie:
@ -451,11 +464,12 @@ class MetricsRepository(metrics_repository.AbstractMetricsRepository):
timestamp = stats[0]
if '.' in timestamp:
stats[0] = str(timestamp)[:19] + 'Z'
stats[1] = stats[1] or 0
stats_list.append(stats)
for stat in stats[1:]:
if stat is not None:
stats_list.append(stats)
statistic = {u'name': serie['name'],
u'id': stats_list[-1][0],
u'id': str(index),
u'columns': columns,
u'statistics': stats_list}
@ -466,6 +480,7 @@ class MetricsRepository(metrics_repository.AbstractMetricsRepository):
if not key.startswith('_')}
json_statistics_list.append(statistic)
index += 1
return json_statistics_list
@ -510,12 +525,14 @@ class MetricsRepository(metrics_repository.AbstractMetricsRepository):
return offset_clause
def _build_group_by_clause(self, group_by, period=None):
if group_by is not None and not isinstance(group_by, list):
group_by = str(group_by).split(',')
if group_by or period:
items = []
if group_by:
items.extend(group_by)
if period:
items.append("time(" + str(period) + "s)")
if group_by:
items.append('*')
clause = " group by " + ','.join(items)
else:
clause = ""
@ -547,7 +564,7 @@ class MetricsRepository(metrics_repository.AbstractMetricsRepository):
dimensions,
start_timestamp,
end_timestamp,
0,
None,
1,
False,
None)

View File

@ -288,6 +288,21 @@ def get_query_period(req):
raise HTTPUnprocessableEntityError('Unprocessable Entity', ex.message)
def get_query_group_by(req):
try:
params = falcon.uri.parse_query_string(req.query_string)
if 'group_by' in params:
group_by = params['group_by']
if not isinstance(group_by, list):
group_by = [group_by]
return group_by
else:
return None
except Exception as ex:
LOG.debug(ex)
raise HTTPUnprocessableEntityError('Unprocessable Entity', ex.message)
def validate_query_name(name):
"""Validates the query param name.
@ -556,7 +571,7 @@ def paginate_measurements(measurements, uri, limit):
['measurements'][:limit]),
u'name': measurement['name'],
u'columns': measurement['columns'],
u'id': new_offset}
u'id': measurement['id']}
measurement_elements.append(truncated_measurement)
break
else:
@ -641,7 +656,7 @@ def paginate_statistics(statistics, uri, limit):
u'statistics': (statistic['statistics'][:limit]),
u'name': statistic['name'],
u'columns': statistic['columns'],
u'id': new_offset}
u'id': statistic['id']}
statistic_elements.append(truncated_statistic)
break

View File

@ -182,7 +182,7 @@ class MetricsMeasurements(metrics_api_v2.MetricsMeasurementsV2API):
offset = helpers.get_query_param(req, 'offset')
limit = helpers.get_limit(req)
merge_metrics_flag = get_merge_metrics_flag(req)
group_by = helpers.get_query_param(req, "group_by")
group_by = helpers.get_query_group_by(req)
result = self._measurement_list(tenant_id, name, dimensions,
start_timestamp, end_timestamp,
@ -243,7 +243,7 @@ class MetricsStatistics(metrics_api_v2.MetricsStatisticsV2API):
offset = helpers.get_query_param(req, 'offset')
limit = helpers.get_limit(req)
merge_metrics_flag = get_merge_metrics_flag(req)
group_by = helpers.get_query_param(req, "group_by")
group_by = helpers.get_query_group_by(req)
result = self._metric_statistics(tenant_id, name, dimensions,
start_timestamp, end_timestamp,

View File

@ -127,3 +127,43 @@ def get_query_param(uri, query_param_name):
if query_param_name == parsed_query_name:
query_param_val = parsed_query_val
return query_param_val
def get_expected_elements_inner_offset_limit(all_elements, offset, limit, inner_key):
expected_elements = []
total_statistics = 0
if offset is None:
offset_id = 0
offset_time = ""
else:
offset_tuple = offset.split('_')
offset_id = int(offset_tuple[0]) if len(offset_tuple) > 1 else 0
offset_time = offset_tuple[1] if len(offset_tuple) > 1 else offset_tuple[0]
for element in all_elements:
element_id = int(element['id'])
if offset_id is not None and element_id < offset_id:
continue
next_element = None
for value in element[inner_key]:
if (element_id == offset_id and value[0] > offset_time) or \
element_id > offset_id:
if not next_element:
next_element = element.copy()
next_element[inner_key] = [value]
else:
next_element[inner_key].append(value)
total_statistics += 1
if total_statistics >= limit:
break
if next_element:
expected_elements.append(next_element)
if total_statistics >= limit:
break
for i in xrange(len(expected_elements)):
expected_elements[i]['id'] = str(i)
return expected_elements

View File

@ -61,16 +61,16 @@ class TestMeasurements(base.BaseMonascaTest):
metric3 = [
helpers.create_metric(
name=name2, timestamp=start_timestamp + ONE_SECOND * 3,
dimensions={'key1': 'value1', 'key2': 'value1'}),
dimensions={'key1': 'value1', 'key2': 'value5', 'key3': 'value7'}),
helpers.create_metric(
name=name2, timestamp=start_timestamp + ONE_SECOND * 3 + 10,
dimensions={'key1': 'value2', 'key2': 'value2'}),
dimensions={'key1': 'value2', 'key2': 'value5', 'key3': 'value7'}),
helpers.create_metric(
name=name2, timestamp=start_timestamp + ONE_SECOND * 3 + 20,
dimensions={'key1': 'value3', 'key2': 'value3'}),
dimensions={'key1': 'value3', 'key2': 'value6', 'key3': 'value7'}),
helpers.create_metric(
name=name2, timestamp=start_timestamp + ONE_SECOND * 3 + 30,
dimensions={'key1': 'value4', 'key2': 'value4'})
dimensions={'key1': 'value4', 'key2': 'value6', 'key3': 'value8'})
]
cls.monasca_client.create_metrics(metric3)
@ -194,7 +194,6 @@ class TestMeasurements(base.BaseMonascaTest):
self.assertRaises(exceptions.BadRequest,
self.monasca_client.list_measurements, query_parms)
@test.attr(type="gate")
def test_list_measurements_with_offset_limit(self):
query_parms = '?name=' + str(self._names_list[1]) + \
@ -259,7 +258,39 @@ class TestMeasurements(base.BaseMonascaTest):
self.assertEqual(200, resp.status)
@test.attr(type="gate")
def test_list_measurements_with_group_by(self):
def test_list_measurements_with_group_by_one(self):
query_parms = '?name=' + str(self._names_list[1]) + \
'&group_by=key2' + \
'&start_time=' + str(self._start_time) + \
'&end_time=' + str(self._end_time)
resp, response_body = self.monasca_client.list_measurements(
query_parms)
self.assertEqual(200, resp.status)
elements = response_body['elements']
self.assertEqual(len(elements), 2)
self._verify_list_measurements_elements(elements, None, None)
for measurements in elements:
self.assertEqual(1, len(measurements['dimensions'].keys()))
self.assertEqual([u'key2'], measurements['dimensions'].keys())
@test.attr(type="gate")
def test_list_measurements_with_group_by_multiple(self):
query_parms = '?name=' + str(self._names_list[1]) + \
'&group_by=key2,key3' + \
'&start_time=' + str(self._start_time) + \
'&end_time=' + str(self._end_time)
resp, response_body = self.monasca_client.list_measurements(
query_parms)
self.assertEqual(200, resp.status)
elements = response_body['elements']
self.assertEqual(len(elements), 3)
self._verify_list_measurements_elements(elements, None, None)
for measurements in elements:
self.assertEqual(2, len(measurements['dimensions'].keys()))
self.assertEqual({u'key2', u'key3'}, set(measurements['dimensions'].keys()))
@test.attr(type="gate")
def test_list_measurements_with_group_by_all(self):
query_parms = '?name=' + str(self._names_list[1]) + \
'&group_by=*' + \
'&start_time=' + str(self._start_time) + \

View File

@ -27,8 +27,6 @@ from urllib import urlencode
NUM_MEASUREMENTS = 100
MIN_REQUIRED_MEASUREMENTS = 2
WAIT_TIME = 30
metric_value1 = 1.23
metric_value2 = 4.56
class TestStatistics(base.BaseMonascaTest):
@ -48,12 +46,13 @@ class TestStatistics(base.BaseMonascaTest):
helpers.create_metric(name=name,
dimensions={key: value1},
timestamp=cls._start_timestamp,
value=metric_value1),
value=1.23),
helpers.create_metric(name=name,
dimensions={key: value2},
timestamp=cls._start_timestamp + 1000,
value=metric_value2)
value=4.56)
]
cls.metric_values = [m['value'] for m in metrics]
cls.monasca_client.create_metrics(metrics)
start_time_iso = helpers.timestamp_to_iso(cls._start_timestamp)
query_param = '?name=' + str(name) + '&start_time=' + \
@ -67,19 +66,53 @@ class TestStatistics(base.BaseMonascaTest):
resp, response_body = cls.monasca_client.\
list_measurements(query_param)
elements = response_body['elements']
for element in elements:
if str(element['name']) == name:
if len(element['measurements']) >= MIN_REQUIRED_MEASUREMENTS:
cls._end_timestamp = cls._start_timestamp + 1000 * 3
cls._end_time_iso = helpers.timestamp_to_iso(
cls._end_timestamp)
return
else:
num_measurements = len(element['measurements'])
break
if len(elements) > 0:
num_measurements = len(elements[0]['measurements'])
if num_measurements >= MIN_REQUIRED_MEASUREMENTS:
break
time.sleep(constants.RETRY_WAIT_SECS)
assert False, "Required {} measurements, found {}".format(MIN_REQUIRED_MEASUREMENTS, num_measurements)
if num_measurements < MIN_REQUIRED_MEASUREMENTS:
assert False, "Required {} measurements, found {}".format(MIN_REQUIRED_MEASUREMENTS, num_measurements)
cls._end_timestamp = cls._start_timestamp + 3000
cls._end_time_iso = helpers.timestamp_to_iso(cls._end_timestamp)
name2 = data_utils.rand_name("group-by")
cls._group_by_metric_name = name2
cls._group_by_end_time_iso = helpers.timestamp_to_iso(cls._start_timestamp + 4000)
group_by_metrics = [
helpers.create_metric(name=name2, dimensions={'key1': 'value1', 'key2': 'value5', 'key3': 'value7'},
timestamp=cls._start_timestamp + 1, value=2),
helpers.create_metric(name=name2, dimensions={'key1': 'value2', 'key2': 'value5', 'key3': 'value7'},
timestamp=cls._start_timestamp + 1001, value=3),
helpers.create_metric(name=name2, dimensions={'key1': 'value3', 'key2': 'value6', 'key3': 'value7'},
timestamp=cls._start_timestamp + 2001, value=5),
helpers.create_metric(name=name2, dimensions={'key1': 'value4', 'key2': 'value6', 'key3': 'value8'},
timestamp=cls._start_timestamp + 3001, value=7),
]
cls.monasca_client.create_metrics(group_by_metrics)
query_param = '?name=' + str(name2) + \
'&start_time=' + start_time_iso + \
'&merge_metrics=true' + \
'&end_time=' + cls._group_by_end_time_iso
num_measurements = 0
for i in xrange(constants.MAX_RETRIES):
resp, response_body = cls.monasca_client. \
list_measurements(query_param)
elements = response_body['elements']
if len(elements) > 0:
num_measurements = len(elements[0]['measurements'])
if num_measurements >= len(group_by_metrics):
break
time.sleep(constants.RETRY_WAIT_SECS)
if num_measurements < len(group_by_metrics):
assert False, "Required {} measurements, found {}".format(len(group_by_metrics),
response_body)
@classmethod
def resource_cleanup(cls):
@ -102,8 +135,7 @@ class TestStatistics(base.BaseMonascaTest):
num_statistics_method = 5
statistics = element['statistics'][0]
self._verify_column_and_statistics(
column, num_statistics_method, statistics, metric_value1,
metric_value2)
column, num_statistics_method, statistics, self.metric_values)
@test.attr(type="gate")
@test.attr(type=['negative'])
@ -270,6 +302,87 @@ class TestStatistics(base.BaseMonascaTest):
# Get the next set
offset = self._get_offset(response_body)
@test.attr(type="gate")
def test_list_statistics_with_group_by_one(self):
query_parms = '?name=' + self._group_by_metric_name + \
'&group_by=key2' + \
'&statistics=max,avg,min' + \
'&start_time=' + str(self._start_time_iso) + \
'&end_time=' + str(self._group_by_end_time_iso)
resp, response_body = self.monasca_client.list_statistics(
query_parms)
self.assertEqual(200, resp.status)
elements = response_body['elements']
self.assertEqual(len(elements), 2)
for statistics in elements:
self.assertEqual(1, len(statistics['dimensions'].keys()))
self.assertEqual([u'key2'], statistics['dimensions'].keys())
@test.attr(type="gate")
def test_list_statistics_with_group_by_multiple(self):
query_parms = '?name=' + self._group_by_metric_name + \
'&group_by=key2,key3' + \
'&statistics=max,avg,min' + \
'&start_time=' + str(self._start_time_iso) + \
'&end_time=' + str(self._group_by_end_time_iso)
resp, response_body = self.monasca_client.list_statistics(
query_parms)
self.assertEqual(200, resp.status)
elements = response_body['elements']
self.assertEqual(len(elements), 3)
for statistics in elements:
self.assertEqual(2, len(statistics['dimensions'].keys()))
self.assertEqual({u'key2', u'key3'}, set(statistics['dimensions'].keys()))
@test.attr(type="gate")
def test_list_statistics_with_group_by_all(self):
query_parms = '?name=' + self._group_by_metric_name + \
'&group_by=*' + \
'&statistics=max,avg,min' + \
'&start_time=' + str(self._start_time_iso) + \
'&end_time=' + str(self._group_by_end_time_iso)
resp, response_body = self.monasca_client.list_statistics(
query_parms)
self.assertEqual(200, resp.status)
elements = response_body['elements']
self.assertEqual(len(elements), 4)
@test.attr(type="gate")
def test_list_statistics_with_group_by_offset_limit(self):
query_parms = '?name=' + str(self._group_by_metric_name) + \
'&group_by=key2' + \
'&statistics=avg,max' + \
'&start_time=' + str(self._start_time_iso) + \
'&end_time=' + str(self._group_by_end_time_iso) + \
'&period=1'
resp, response_body = self.monasca_client.list_statistics(query_parms)
self.assertEqual(200, resp.status)
all_expected_elements = response_body['elements']
for limit in xrange(1, 4):
offset = None
for i in xrange(4 - limit):
query_parms = '?name=' + str(self._group_by_metric_name) + \
'&group_by=key2' + \
'&statistics=avg,max' + \
'&start_time=' + str(self._start_time_iso) + \
'&end_time=' + str(self._group_by_end_time_iso) + \
'&period=1' + \
'&limit=' + str(limit)
if i > 0:
offset = self._get_offset(response_body)
query_parms += "&offset=" + offset
expected_elements = helpers.get_expected_elements_inner_offset_limit(
all_expected_elements,
offset,
limit,
'statistics')
resp, response_body = self.monasca_client.list_statistics(query_parms)
self.assertEqual(200, resp.status)
self.assertEqual(expected_elements, response_body['elements'])
@test.attr(type="gate")
@test.attr(type=['negative'])
def test_list_statistics_with_no_merge_metrics(self):
@ -342,22 +455,22 @@ class TestStatistics(base.BaseMonascaTest):
self.assertEqual(element['name'], self._test_name)
def _verify_column_and_statistics(
self, column, num_statistics_method, statistics, num1, num2):
self, column, num_statistics_method, statistics, values):
self.assertTrue(type(column) is list)
self.assertTrue(type(statistics) is list)
self.assertEqual(len(column), num_statistics_method + 1)
self.assertEqual(column[0], 'timestamp')
for i, method in enumerate(column):
if method == 'avg':
self.assertAlmostEqual(statistics[i], (num1 + num2) / 2)
self.assertAlmostEqual(statistics[i], float(sum(values) / len(values)))
elif method == 'max':
self.assertEqual(statistics[i], max(num1, num2))
self.assertEqual(statistics[i], max(values))
elif method == 'min':
self.assertEqual(statistics[i], min(num1, num2))
self.assertEqual(statistics[i], min(values))
elif method == 'sum':
self.assertAlmostEqual(statistics[i], num1 + num2)
self.assertAlmostEqual(statistics[i], sum(values))
elif method == 'count':
self.assertEqual(statistics[i], 2)
self.assertEqual(statistics[i], len(values))
def _check_timeout(self, timer, max_retries, elements,
expect_num_elements):