Manage persistent cache versioning outside of Java serialization

The flipside of switching to a new serialization format that, unlike
Java serialization, doesn't store the version in the serialized
representation, is that we need to manage versioning on our own.

Define a version field in PersistentCacheDef, and store this in a new
column in the H2 database. This puts slightly more burden on the cache
implementation to manually manage versions, but makes the process of
updating caches safer; the latter happens much more often than the
former, so the extra up-front effort is worth it.

For existing caches using Java serialization, continue using the default
version of 0. This keeps the same invalidation semantics, and we can
bump the version to 1 when we start implementing new serialization
strategies.

Change-Id: I0d9cccea713fe6dcf368d8b95be59465c336bbb3
This commit is contained in:
Dave Borowitz 2018-04-21 11:25:41 -04:00
parent 7ae7fb1cc0
commit 49c1c16097
8 changed files with 132 additions and 38 deletions

View File

@ -149,7 +149,8 @@ public abstract class CacheModule extends FactoryModule {
PersistentCacheProvider<K, V> m = new PersistentCacheProvider<>(this, name, keyType, valType);
bindCache(m, name, keyType, valType);
// TODO(dborowitz): Once default Java serialization is removed, leave no default.
return m.keySerializer(new JavaCacheSerializer<>())
return m.version(0)
.keySerializer(new JavaCacheSerializer<>())
.valueSerializer(new JavaCacheSerializer<>());
}

View File

@ -32,6 +32,8 @@ public interface PersistentCacheBinding<K, V> extends CacheBinding<K, V> {
@Override
PersistentCacheBinding<K, V> weigher(Class<? extends Weigher<K, V>> clazz);
PersistentCacheBinding<K, V> version(int version);
/** Set the total on-disk limit of the cache */
PersistentCacheBinding<K, V> diskLimit(long limit);

View File

@ -17,6 +17,8 @@ package com.google.gerrit.server.cache;
public interface PersistentCacheDef<K, V> extends CacheDef<K, V> {
long diskLimit();
int version();
CacheSerializer<K> keySerializer();
CacheSerializer<V> valueSerializer();

View File

@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
class PersistentCacheProvider<K, V> extends CacheProvider<K, V>
implements Provider<Cache<K, V>>, PersistentCacheBinding<K, V>, PersistentCacheDef<K, V> {
private int version;
private long diskLimit;
private CacheSerializer<K> keySerializer;
private CacheSerializer<V> valueSerializer;
@ -36,6 +37,7 @@ class PersistentCacheProvider<K, V> extends CacheProvider<K, V>
PersistentCacheProvider(
CacheModule module, String name, TypeLiteral<K> keyType, TypeLiteral<V> valType) {
super(module, name, keyType, valType);
version = -1;
}
@Inject(optional = true)
@ -63,6 +65,12 @@ class PersistentCacheProvider<K, V> extends CacheProvider<K, V>
return (PersistentCacheBinding<K, V>) super.weigher(clazz);
}
@Override
public PersistentCacheBinding<K, V> version(int version) {
this.version = version;
return this;
}
@Override
public PersistentCacheBinding<K, V> keySerializer(CacheSerializer<K> keySerializer) {
this.keySerializer = keySerializer;
@ -90,6 +98,11 @@ class PersistentCacheProvider<K, V> extends CacheProvider<K, V>
return 128 << 20;
}
@Override
public int version() {
return version;
}
@Override
public CacheSerializer<K> keySerializer() {
return keySerializer;
@ -105,6 +118,7 @@ class PersistentCacheProvider<K, V> extends CacheProvider<K, V>
if (persistentCacheFactory == null) {
return super.get();
}
checkState(version >= 0, "version is required");
checkState(keySerializer != null, "keySerializer is required");
checkState(valueSerializer != null, "valueSerializer is required");
freeze();

View File

@ -88,6 +88,11 @@ class H2CacheDefProxy<K, V> implements PersistentCacheDef<K, V> {
return source.loader();
}
@Override
public int version() {
return source.version();
}
@Override
public CacheSerializer<K> keySerializer() {
return source.keySerializer();

View File

@ -222,6 +222,7 @@ class H2CacheFactory implements PersistentCacheFactory, LifecycleListener {
def.keyType(),
def.keySerializer(),
def.valueSerializer(),
def.version(),
maxSize,
expireAfterWrite == null ? 0 : expireAfterWrite.longValue());
}

View File

@ -253,6 +253,7 @@ public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements Per
private final String url;
private final KeyType<K> keyType;
private final CacheSerializer<V> valueSerializer;
private final int version;
private final long maxSize;
private final long expireAfterWrite;
private final BlockingQueue<SqlHandle> handles;
@ -266,11 +267,13 @@ public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements Per
TypeLiteral<K> keyType,
CacheSerializer<K> keySerializer,
CacheSerializer<V> valueSerializer,
int version,
long maxSize,
long expireAfterWrite) {
this.url = jdbcUrl;
this.keyType = createKeyType(keyType, keySerializer);
this.valueSerializer = valueSerializer;
this.version = version;
this.maxSize = maxSize;
this.expireAfterWrite = expireAfterWrite;
@ -319,32 +322,42 @@ public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements Per
SqlHandle c = null;
try {
c = acquire();
try (Statement s = c.conn.createStatement()) {
if (estimatedSize <= 0) {
try (ResultSet r = s.executeQuery("SELECT COUNT(*) FROM data")) {
if (estimatedSize <= 0) {
try (PreparedStatement ps =
c.conn.prepareStatement("SELECT COUNT(*) FROM data WHERE version=?")) {
ps.setInt(1, version);
try (ResultSet r = ps.executeQuery()) {
estimatedSize = r.next() ? r.getInt(1) : 0;
}
}
}
BloomFilter<K> b = newBloomFilter();
try (ResultSet r = s.executeQuery("SELECT k FROM data")) {
BloomFilter<K> b = newBloomFilter();
try (PreparedStatement ps = c.conn.prepareStatement("SELECT k FROM data WHERE version=?")) {
ps.setInt(1, version);
try (ResultSet r = ps.executeQuery()) {
while (r.next()) {
b.put(keyType.get(r, 1));
}
} catch (JdbcSQLException e) {
if (e.getCause() instanceof InvalidClassException) {
log.warn(
"Entries cached for "
+ url
+ " have an incompatible class and can't be deserialized. "
+ "Cache is flushed.");
invalidateAll();
} else {
throw e;
}
}
return b;
} catch (JdbcSQLException e) {
if (e.getCause() instanceof InvalidClassException) {
// If deserialization failed using default Java serialization, this means we are using
// the old serialVersionUID-based invalidation strategy. In that case, authors are
// most likely bumping serialVersionUID rather than using the new versioning in the
// CacheBinding. That's ok; we'll continue to support both for now.
// TODO(dborowitz): Remove this case when Java serialization is no longer used.
log.warn(
"Entries cached for "
+ url
+ " have an incompatible class and can't be deserialized. "
+ "Cache is flushed.");
invalidateAll();
} else {
throw e;
}
}
return b;
} catch (IOException | SQLException e) {
log.warn("Cannot build BloomFilter for " + url + ": " + e.getMessage());
c = close(c);
@ -359,9 +372,14 @@ public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements Per
try {
c = acquire();
if (c.get == null) {
c.get = c.conn.prepareStatement("SELECT v, created FROM data WHERE k=?");
c.get = c.conn.prepareStatement("SELECT v, created FROM data WHERE k=? AND version=?");
}
keyType.set(c.get, 1, key);
// Silently no results when the only value in the database is an older version. This will
// result in put overwriting the stored value with the new version, which is intended.
c.get.setInt(2, version);
try (ResultSet r = c.get.executeQuery()) {
if (!r.next()) {
missCount.incrementAndGet();
@ -414,11 +432,12 @@ public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements Per
private void touch(SqlHandle c, K key) throws IOException, SQLException {
if (c.touch == null) {
c.touch = c.conn.prepareStatement("UPDATE data SET accessed=? WHERE k=?");
c.touch = c.conn.prepareStatement("UPDATE data SET accessed=? WHERE k=? AND version=?");
}
try {
c.touch.setTimestamp(1, TimeUtil.nowTs());
keyType.set(c.touch, 2, key);
c.touch.setInt(3, version);
c.touch.executeUpdate();
} finally {
c.touch.clearParameters();
@ -441,13 +460,15 @@ public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements Per
c = acquire();
if (c.put == null) {
c.put =
c.conn.prepareStatement("MERGE INTO data (k, v, created, accessed) VALUES(?,?,?,?)");
c.conn.prepareStatement(
"MERGE INTO data (k, v, version, created, accessed) VALUES(?,?,?,?,?)");
}
try {
keyType.set(c.put, 1, key);
c.put.setBytes(2, valueSerializer.serialize(holder.value));
c.put.setTimestamp(3, new Timestamp(holder.created));
c.put.setTimestamp(4, TimeUtil.nowTs());
c.put.setInt(3, version);
c.put.setTimestamp(4, new Timestamp(holder.created));
c.put.setTimestamp(5, TimeUtil.nowTs());
c.put.executeUpdate();
holder.clean = true;
} finally {
@ -476,10 +497,11 @@ public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements Per
private void invalidate(SqlHandle c, K key) throws IOException, SQLException {
if (c.invalidate == null) {
c.invalidate = c.conn.prepareStatement("DELETE FROM data WHERE k=?");
c.invalidate = c.conn.prepareStatement("DELETE FROM data WHERE k=? and version=?");
}
try {
keyType.set(c.invalidate, 1, key);
c.invalidate.setInt(2, version);
c.invalidate.executeUpdate();
} finally {
c.invalidate.clearParameters();
@ -506,7 +528,15 @@ public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements Per
SqlHandle c = null;
try {
c = acquire();
try (PreparedStatement ps = c.conn.prepareStatement("DELETE FROM data WHERE version!=?")) {
ps.setInt(1, version);
int oldEntries = ps.executeUpdate();
log.info(
"Pruned {} entries not matching version {} from cache {}", oldEntries, version, url);
}
try (Statement s = c.conn.createStatement()) {
// Compute size without restricting to version (although obsolete data was just pruned
// anyway).
long used = 0;
try (ResultSet r = s.executeQuery("SELECT SUM(space) FROM data")) {
used = r.next() ? r.getLong(1) : 0;
@ -516,8 +546,7 @@ public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements Per
}
try (ResultSet r =
s.executeQuery(
"SELECT" + " k" + ",space" + ",created" + " FROM data" + " ORDER BY accessed")) {
s.executeQuery("SELECT k, space, created FROM data ORDER BY accessed")) {
while (maxSize < used && r.next()) {
K key = keyType.get(r, 1);
Timestamp created = r.getTimestamp(3);
@ -545,7 +574,8 @@ public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements Per
try {
c = acquire();
try (Statement s = c.conn.createStatement();
ResultSet r = s.executeQuery("SELECT" + " COUNT(*)" + ",SUM(space)" + " FROM data")) {
// Stats include total size regardless of version.
ResultSet r = s.executeQuery("SELECT COUNT(*), SUM(space) FROM data")) {
if (r.next()) {
size = r.getLong(1);
space = r.getLong(2);
@ -608,6 +638,7 @@ public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements Per
stmt.addBatch(
"ALTER TABLE data ADD COLUMN IF NOT EXISTS "
+ "space BIGINT AS OCTET_LENGTH(k) + OCTET_LENGTH(v)");
stmt.addBatch("ALTER TABLE data ADD COLUMN IF NOT EXISTS version INT DEFAULT 0 NOT NULL");
stmt.executeBatch();
}
}

View File

@ -26,33 +26,36 @@ import com.google.gerrit.server.cache.h2.H2CacheImpl.ValueHolder;
import com.google.inject.TypeLiteral;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Before;
import org.junit.Test;
public class H2CacheTest {
private static final TypeLiteral<String> KEY_TYPE = new TypeLiteral<String>() {};
private static final int DEFAULT_VERSION = 1234;
private static int dbCnt;
private Cache<String, ValueHolder<String>> mem;
private H2CacheImpl<String, String> impl;
private static int nextDbId() {
return ++dbCnt;
}
@Before
public void setUp() {
mem = CacheBuilder.newBuilder().build();
TypeLiteral<String> keyType = new TypeLiteral<String>() {};
private static H2CacheImpl<String, String> newH2CacheImpl(
int id, Cache<String, ValueHolder<String>> mem, int version) {
SqlStore<String, String> store =
new SqlStore<>(
"jdbc:h2:mem:Test_" + (++dbCnt),
keyType,
"jdbc:h2:mem:Test_" + id,
KEY_TYPE,
StringSerializer.INSTANCE,
StringSerializer.INSTANCE,
version,
1 << 20,
0);
impl = new H2CacheImpl<>(MoreExecutors.directExecutor(), store, keyType, mem);
return new H2CacheImpl<>(MoreExecutors.directExecutor(), store, KEY_TYPE, mem);
}
@Test
public void get() throws ExecutionException {
Cache<String, ValueHolder<String>> mem = CacheBuilder.newBuilder().build();
H2CacheImpl<String, String> impl = newH2CacheImpl(nextDbId(), mem, DEFAULT_VERSION);
assertThat(impl.getIfPresent("foo")).isNull();
AtomicBoolean called = new AtomicBoolean();
@ -90,6 +93,37 @@ public class H2CacheTest {
assertThat(StringSerializer.INSTANCE.deserialize(serialized)).isEqualTo(input);
}
@Test
public void version() throws Exception {
int id = nextDbId();
H2CacheImpl<String, String> oldImpl = newH2CacheImpl(id, disableMemCache(), DEFAULT_VERSION);
H2CacheImpl<String, String> newImpl =
newH2CacheImpl(id, disableMemCache(), DEFAULT_VERSION + 1);
assertThat(oldImpl.diskStats().space()).isEqualTo(0);
assertThat(newImpl.diskStats().space()).isEqualTo(0);
oldImpl.put("key", "val");
assertThat(oldImpl.getIfPresent("key")).isEqualTo("val");
assertThat(oldImpl.diskStats().space()).isEqualTo(12);
assertThat(oldImpl.diskStats().hitCount()).isEqualTo(1);
// Can't find key in cache with wrong version, but the data is still there.
assertThat(newImpl.diskStats().requestCount()).isEqualTo(0);
assertThat(newImpl.diskStats().space()).isEqualTo(12);
assertThat(newImpl.getIfPresent("key")).isNull();
assertThat(newImpl.diskStats().space()).isEqualTo(12);
// Re-putting it via the new cache works, and uses the same amount of space.
newImpl.put("key", "val2");
assertThat(newImpl.getIfPresent("key")).isEqualTo("val2");
assertThat(newImpl.diskStats().hitCount()).isEqualTo(1);
assertThat(newImpl.diskStats().space()).isEqualTo(14);
// Now it's no longer in the old cache.
assertThat(oldImpl.diskStats().space()).isEqualTo(14);
assertThat(oldImpl.getIfPresent("key")).isNull();
}
// TODO(dborowitz): Won't be necessary when we use a real StringSerializer in the server code.
private enum StringSerializer implements CacheSerializer<String> {
INSTANCE;
@ -106,4 +140,8 @@ public class H2CacheTest {
return new String(in, UTF_8);
}
}
private static <K, V> Cache<K, ValueHolder<V>> disableMemCache() {
return CacheBuilder.newBuilder().maximumSize(0).build();
}
}