Replace measurements table join with subquery that fits in memory

Our infrastructure project now has enough measurements in
vertica such that the inner join of the definitiondimensions
table to the measurements table no longer fits in memory --
making the measurements (not statistics) API a land mine.  We receive
the following vertica warning (8389):

'Join inner did not fit in memory [(MonMetrics.DefinitionDimensions x MonMetrics.Measurements)'

If a user loads a dashboard without applying a statistics function, it
can completely lock up monasca-api process and require a service restart.

This change removes the join and replaces with two more efficient
queries, more like the statistics interface.

Change-Id: I8e686bf2371e629809bc53b309c766848aaad827
Closes-Bug: #1565977
This commit is contained in:
Brad Klein 2016-04-11 08:25:24 -06:00
parent 17c9068729
commit 4645a24fc7
1 changed files with 92 additions and 91 deletions

View File

@ -25,9 +25,12 @@ import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Iterator;
import java.util.Set;
import javax.annotation.Nullable;
import javax.inject.Inject;
@ -51,18 +54,25 @@ public class MeasurementVerticaRepoImpl implements MeasurementRepo {
ISODateTimeFormat.dateTime().withZoneUTC();
private static final String FIND_BY_METRIC_DEF_SQL =
"select def.name, mes.definition_dimensions_id, defdims.dimension_set_id, defdims.definition_id, "
"select mes.definition_dimensions_id, "
+ "mes.time_stamp, mes.value, mes.value_meta "
+ "from MonMetrics.Measurements mes, MonMetrics.Definitions def, MonMetrics.DefinitionDimensions defdims "
+ "where mes.definition_dimensions_id = defdims.id "
+ "and def.id = defdims.definition_id "
+ "and def.tenant_id = :tenantId "
+ "from MonMetrics.Measurements mes "
+ "where to_hex(mes.definition_dimensions_id) "
+ "%s " // defdim IN clause here
+ "%s " // endtime and offset here
+ "and mes.time_stamp >= :startTime "
+ "%s " // metric name here
+ "%s " // dimension and clause here
+ "order by mes.time_stamp ASC "
+ "limit :limit";
private static final String
DEFDIM_IDS_SELECT =
"SELECT defDims.id, defDims.dimension_set_id, defDims.definition_id "
+ "FROM MonMetrics.Definitions def, MonMetrics.DefinitionDimensions defDims "
+ "WHERE defDims.definition_id = def.id "
+ "AND def.tenant_id = :tenantId "
+ "%s " // Name clause here
+ "%s;"; // Dimensions and clause goes here
private static final String TABLE_TO_JOIN_DIMENSIONS_ON = "defDims";
private final DBI db;
@ -91,85 +101,97 @@ public class MeasurementVerticaRepoImpl implements MeasurementRepo {
try (Handle h = db.open()) {
StringBuilder sb = new StringBuilder();
Map<ByteBuffer, Measurements> results = new LinkedHashMap<>();
Set<byte[]> defDimIdSet = new HashSet<>();
Set<byte[]> dimSetIdSet = new HashSet<>();
String namePart = "";
if (name != null && !name.isEmpty()) {
sb.append(" and def.name = :name");
namePart = "AND def.name = :name ";
}
if (endTime != null) {
String defDimSql = String.format(
DEFDIM_IDS_SELECT,
namePart,
MetricQueries.buildDimensionAndClause(dimensions, "defDims", 0));
sb.append(" and mes.time_stamp <= :endTime");
}
if (offset != null && !offset.isEmpty()) {
sb.append(" and time_stamp > :offset");
}
String sql =
String.format(FIND_BY_METRIC_DEF_SQL,
sb,
MetricQueries.buildDimensionAndClause(dimensions,
TABLE_TO_JOIN_DIMENSIONS_ON,
0)); // no limit for dim set ids
Query<Map<String, Object>> query =
h.createQuery(sql)
.bind("tenantId", tenantId)
.bind("startTime", new Timestamp(startTime.getMillis()))
.bind("limit", limit + 1);
if (name != null && !name.isEmpty()) {
logger.debug("binding name: {}", name);
query.bind("name", name);
}
if (endTime != null) {
logger.debug("binding endtime: {}", endTime);
query.bind("endTime", new Timestamp(endTime.getMillis()));
}
if (offset != null && !offset.isEmpty()) {
logger.debug("binding offset: {}", offset);
query.bind("offset", new Timestamp(DateTime.parse(offset).getMillis()));
}
Query<Map<String, Object>> query = h.createQuery(defDimSql).bind("tenantId", tenantId);
MetricQueries.bindDimensionsToQuery(query, dimensions);
if (name != null && !name.isEmpty()) {
query.bind("name", name);
}
List<Map<String, Object>> rows = query.list();
Map<ByteBuffer, Measurements> results = new LinkedHashMap<>();
ByteBuffer defId = ByteBuffer.wrap(new byte[0]);
for (Map<String, Object> row : rows) {
String metricName = (String) row.get("name");
byte[] defIdBytes = (byte[]) row.get("definition_id");
ByteBuffer defId = ByteBuffer.wrap(defIdBytes);
byte[] defdimsIdBytes = (byte[]) row.get("definition_dimensions_id");
byte[] defDimId = (byte[]) row.get("id");
defDimIdSet.add(defDimId);
byte[] dimSetIdBytes = (byte[]) row.get("dimension_set_id");
dimSetIdSet.add(dimSetIdBytes);
ByteBuffer defdimsId = ByteBuffer.wrap(defdimsIdBytes);
byte[] defIdBytes = (byte[]) row.get("definition_id");
defId = ByteBuffer.wrap(defIdBytes);
}
if (!Boolean.TRUE.equals(mergeMetricsFlag) && (dimSetIdSet.size() > 1)) {
throw new MultipleMetricsException(name, dimensions);
}
//
// If we didn't find any definition dimension ids,
// we won't have any measurements, let's just bail
// now.
//
if (defDimIdSet.size() == 0) {
return new ArrayList<>(results.values());
}
String defDimInClause = MetricQueries.createDefDimIdInClause(defDimIdSet);
StringBuilder sb = new StringBuilder();
if (endTime != null) {
sb.append(" and time_stamp <= :endTime");
}
if (offset != null && !offset.isEmpty()) {
sb.append(" and time_stamp > :offset");
}
String sql = String.format(FIND_BY_METRIC_DEF_SQL, defDimInClause, sb);
query = h.createQuery(sql)
.bind("startTime", new Timestamp(startTime.getMillis()))
.bind("limit", limit + 1);
if (endTime != null) {
logger.debug("binding endtime: {}", endTime);
query.bind("endTime", new Timestamp(endTime.getMillis()));
}
if (offset != null && !offset.isEmpty()) {
logger.debug("binding offset: {}", offset);
query.bind("offset", new Timestamp(DateTime.parse(offset).getMillis()));
}
rows = query.list();
for (Map<String, Object> row : rows) {
String timestamp = DATETIME_FORMATTER.print(((Timestamp) row.get("time_stamp")).getTime());
byte[] defdimsIdBytes = (byte[]) row.get("definition_dimensions_id");
ByteBuffer defdimsId = ByteBuffer.wrap(defdimsIdBytes);
double value = (double) row.get("value");
String valueMetaString = (String) row.get("value_meta");
@ -189,41 +211,20 @@ public class MeasurementVerticaRepoImpl implements MeasurementRepo {
}
Measurements measurements;
if (Boolean.TRUE.equals(mergeMetricsFlag)) {
measurements = results.get(defId);
} else {
measurements = results.get(defdimsId);
}
Measurements measurements = (Boolean.TRUE.equals(mergeMetricsFlag)) ? results.get(defId) : results.get(defdimsId);
if (measurements == null) {
if (Boolean.TRUE.equals(mergeMetricsFlag)) {
measurements =
new Measurements(metricName, new HashMap<String, String>(),
new Measurements(name, new HashMap<String, String>(),
new ArrayList<Object[]>());
results.put(defId, measurements);
} else {
measurements =
new Measurements(metricName, MetricQueries.dimensionsFor(h, dimSetIdBytes),
new Measurements(name, MetricQueries.dimensionsFor(h, (byte[]) dimSetIdSet.toArray()[0]),
new ArrayList<Object[]>());
results.put(defdimsId, measurements);
if (results.keySet().size() > 1) {
throw new MultipleMetricsException(name, dimensions);
}
}
}