From 4645a24fc740ee6f81649e8c0dfa69ba00837f34 Mon Sep 17 00:00:00 2001 From: Brad Klein Date: Mon, 11 Apr 2016 08:25:24 -0600 Subject: [PATCH] Replace measurements table join with subquery that fits in memory Our infrastructure project now has enough measurements in vertica such that the inner join of the definitiondimensions table to the measurements table no longer fits in memory -- making the measurements (not statistics) API a land mine. We receive the following vertica warning (8389): 'Join inner did not fit in memory [(MonMetrics.DefinitionDimensions x MonMetrics.Measurements)' If a user loads a dashboard without applying a statistics function, it can completely lock up monasca-api process and require a service restart. This change removes the join and replaces with two more efficient queries, more like the statistics interface. Change-Id: I8e686bf2371e629809bc53b309c766848aaad827 Closes-Bug: #1565977 --- .../vertica/MeasurementVerticaRepoImpl.java | 183 +++++++++--------- 1 file changed, 92 insertions(+), 91 deletions(-) diff --git a/java/src/main/java/monasca/api/infrastructure/persistence/vertica/MeasurementVerticaRepoImpl.java b/java/src/main/java/monasca/api/infrastructure/persistence/vertica/MeasurementVerticaRepoImpl.java index 2e7101c24..8d2f5e9cd 100644 --- a/java/src/main/java/monasca/api/infrastructure/persistence/vertica/MeasurementVerticaRepoImpl.java +++ b/java/src/main/java/monasca/api/infrastructure/persistence/vertica/MeasurementVerticaRepoImpl.java @@ -25,9 +25,12 @@ import java.nio.ByteBuffer; import java.sql.Timestamp; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Iterator; +import java.util.Set; import javax.annotation.Nullable; import javax.inject.Inject; @@ -51,18 +54,25 @@ public class MeasurementVerticaRepoImpl implements MeasurementRepo { ISODateTimeFormat.dateTime().withZoneUTC(); private static final String FIND_BY_METRIC_DEF_SQL = - "select def.name, mes.definition_dimensions_id, defdims.dimension_set_id, defdims.definition_id, " + "select mes.definition_dimensions_id, " + "mes.time_stamp, mes.value, mes.value_meta " - + "from MonMetrics.Measurements mes, MonMetrics.Definitions def, MonMetrics.DefinitionDimensions defdims " - + "where mes.definition_dimensions_id = defdims.id " - + "and def.id = defdims.definition_id " - + "and def.tenant_id = :tenantId " + + "from MonMetrics.Measurements mes " + + "where to_hex(mes.definition_dimensions_id) " + + "%s " // defdim IN clause here + + "%s " // endtime and offset here + "and mes.time_stamp >= :startTime " - + "%s " // metric name here - + "%s " // dimension and clause here + "order by mes.time_stamp ASC " + "limit :limit"; + private static final String + DEFDIM_IDS_SELECT = + "SELECT defDims.id, defDims.dimension_set_id, defDims.definition_id " + + "FROM MonMetrics.Definitions def, MonMetrics.DefinitionDimensions defDims " + + "WHERE defDims.definition_id = def.id " + + "AND def.tenant_id = :tenantId " + + "%s " // Name clause here + + "%s;"; // Dimensions and clause goes here + private static final String TABLE_TO_JOIN_DIMENSIONS_ON = "defDims"; private final DBI db; @@ -91,85 +101,97 @@ public class MeasurementVerticaRepoImpl implements MeasurementRepo { try (Handle h = db.open()) { - StringBuilder sb = new StringBuilder(); + Map results = new LinkedHashMap<>(); + + Set defDimIdSet = new HashSet<>(); + Set dimSetIdSet = new HashSet<>(); + + String namePart = ""; if (name != null && !name.isEmpty()) { - - sb.append(" and def.name = :name"); - + namePart = "AND def.name = :name "; } - if (endTime != null) { + String defDimSql = String.format( + DEFDIM_IDS_SELECT, + namePart, + MetricQueries.buildDimensionAndClause(dimensions, "defDims", 0)); - sb.append(" and mes.time_stamp <= :endTime"); - - } - - if (offset != null && !offset.isEmpty()) { - - sb.append(" and time_stamp > :offset"); - - } - - String sql = - String.format(FIND_BY_METRIC_DEF_SQL, - sb, - MetricQueries.buildDimensionAndClause(dimensions, - TABLE_TO_JOIN_DIMENSIONS_ON, - 0)); // no limit for dim set ids - - Query> query = - h.createQuery(sql) - .bind("tenantId", tenantId) - .bind("startTime", new Timestamp(startTime.getMillis())) - .bind("limit", limit + 1); - - if (name != null && !name.isEmpty()) { - - logger.debug("binding name: {}", name); - - query.bind("name", name); - - } - - if (endTime != null) { - - logger.debug("binding endtime: {}", endTime); - - query.bind("endTime", new Timestamp(endTime.getMillis())); - - } - - if (offset != null && !offset.isEmpty()) { - - logger.debug("binding offset: {}", offset); - - query.bind("offset", new Timestamp(DateTime.parse(offset).getMillis())); - - } + Query> query = h.createQuery(defDimSql).bind("tenantId", tenantId); MetricQueries.bindDimensionsToQuery(query, dimensions); + if (name != null && !name.isEmpty()) { + query.bind("name", name); + } + List> rows = query.list(); - Map results = new LinkedHashMap<>(); + ByteBuffer defId = ByteBuffer.wrap(new byte[0]); for (Map row : rows) { - String metricName = (String) row.get("name"); - - byte[] defIdBytes = (byte[]) row.get("definition_id"); - - ByteBuffer defId = ByteBuffer.wrap(defIdBytes); - - byte[] defdimsIdBytes = (byte[]) row.get("definition_dimensions_id"); + byte[] defDimId = (byte[]) row.get("id"); + defDimIdSet.add(defDimId); byte[] dimSetIdBytes = (byte[]) row.get("dimension_set_id"); + dimSetIdSet.add(dimSetIdBytes); - ByteBuffer defdimsId = ByteBuffer.wrap(defdimsIdBytes); + byte[] defIdBytes = (byte[]) row.get("definition_id"); + defId = ByteBuffer.wrap(defIdBytes); + + } + + if (!Boolean.TRUE.equals(mergeMetricsFlag) && (dimSetIdSet.size() > 1)) { + throw new MultipleMetricsException(name, dimensions); + } + + // + // If we didn't find any definition dimension ids, + // we won't have any measurements, let's just bail + // now. + // + if (defDimIdSet.size() == 0) { + return new ArrayList<>(results.values()); + } + + String defDimInClause = MetricQueries.createDefDimIdInClause(defDimIdSet); + + StringBuilder sb = new StringBuilder(); + + if (endTime != null) { + sb.append(" and time_stamp <= :endTime"); + } + + if (offset != null && !offset.isEmpty()) { + sb.append(" and time_stamp > :offset"); + } + + String sql = String.format(FIND_BY_METRIC_DEF_SQL, defDimInClause, sb); + + query = h.createQuery(sql) + .bind("startTime", new Timestamp(startTime.getMillis())) + .bind("limit", limit + 1); + + if (endTime != null) { + logger.debug("binding endtime: {}", endTime); + query.bind("endTime", new Timestamp(endTime.getMillis())); + } + + if (offset != null && !offset.isEmpty()) { + logger.debug("binding offset: {}", offset); + query.bind("offset", new Timestamp(DateTime.parse(offset).getMillis())); + } + + rows = query.list(); + + for (Map row : rows) { String timestamp = DATETIME_FORMATTER.print(((Timestamp) row.get("time_stamp")).getTime()); + byte[] defdimsIdBytes = (byte[]) row.get("definition_dimensions_id"); + ByteBuffer defdimsId = ByteBuffer.wrap(defdimsIdBytes); + double value = (double) row.get("value"); String valueMetaString = (String) row.get("value_meta"); @@ -189,41 +211,20 @@ public class MeasurementVerticaRepoImpl implements MeasurementRepo { } - Measurements measurements; - - if (Boolean.TRUE.equals(mergeMetricsFlag)) { - - measurements = results.get(defId); - - } else { - - measurements = results.get(defdimsId); - - } + Measurements measurements = (Boolean.TRUE.equals(mergeMetricsFlag)) ? results.get(defId) : results.get(defdimsId); if (measurements == null) { - if (Boolean.TRUE.equals(mergeMetricsFlag)) { - measurements = - new Measurements(metricName, new HashMap(), + new Measurements(name, new HashMap(), new ArrayList()); results.put(defId, measurements); - } else { - measurements = - new Measurements(metricName, MetricQueries.dimensionsFor(h, dimSetIdBytes), + new Measurements(name, MetricQueries.dimensionsFor(h, (byte[]) dimSetIdSet.toArray()[0]), new ArrayList()); - results.put(defdimsId, measurements); - - if (results.keySet().size() > 1) { - - throw new MultipleMetricsException(name, dimensions); - - } } }