Implement refreshAfterWrite for in-memory and serialized caches

Change-Id: Ibc188bf663f49f784d0e33b391ea1a99ce00b4b8
This commit is contained in:
Patrick Hiesel
2020-04-15 09:28:37 +02:00
parent 3a7690236f
commit ef9fafd8f9
9 changed files with 205 additions and 31 deletions

View File

@@ -29,6 +29,13 @@ public interface CacheBinding<K, V> {
/** Set the time an element lives after last access before being expired. */
CacheBinding<K, V> expireFromMemoryAfterAccess(Duration duration);
/**
* Set the time that an element will be refreshed after. Elements older than this but younger than
* {@link #expireAfterWrite(Duration)} will still be returned, but on access a task is queued to
* refresh their value asynchronously.
*/
CacheBinding<K, V> refreshAfterWrite(Duration duration);
/** Populate the cache with items from the CacheLoader. */
CacheBinding<K, V> loader(Class<? extends CacheLoader<K, V>> clazz);

View File

@@ -50,6 +50,9 @@ public interface CacheDef<K, V> {
@Nullable
Duration expireFromMemoryAfterAccess();
@Nullable
Duration refreshAfterWrite();
@Nullable
Weigher<K, V> weigher();

View File

@@ -38,6 +38,7 @@ class CacheProvider<K, V> implements Provider<Cache<K, V>>, CacheBinding<K, V>,
private long maximumWeight;
private Duration expireAfterWrite;
private Duration expireFromMemoryAfterAccess;
private Duration refreshAfterWrite;
private Provider<CacheLoader<K, V>> loader;
private Provider<Weigher<K, V>> weigher;
@@ -89,6 +90,13 @@ class CacheProvider<K, V> implements Provider<Cache<K, V>>, CacheBinding<K, V>,
return this;
}
@Override
public CacheBinding<K, V> refreshAfterWrite(Duration duration) {
checkNotFrozen();
refreshAfterWrite = duration;
return this;
}
@Override
public CacheBinding<K, V> loader(Class<? extends CacheLoader<K, V>> impl) {
checkNotFrozen();
@@ -150,6 +158,11 @@ class CacheProvider<K, V> implements Provider<Cache<K, V>>, CacheBinding<K, V>,
return expireFromMemoryAfterAccess;
}
@Override
public Duration refreshAfterWrite() {
return refreshAfterWrite;
}
@Override
@Nullable
public Weigher<K, V> weigher() {

View File

@@ -42,6 +42,11 @@ class H2CacheDefProxy<K, V> implements PersistentCacheDef<K, V> {
return source.expireFromMemoryAfterAccess();
}
@Override
public Duration refreshAfterWrite() {
return source.refreshAfterWrite();
}
@Override
public Weigher<K, V> weigher() {
Weigher<K, V> weigher = source.weigher();

View File

@@ -237,6 +237,7 @@ class H2CacheFactory implements PersistentCacheFactory, LifecycleListener {
def.valueSerializer(),
def.version(),
maxSize,
def.expireAfterWrite());
def.expireAfterWrite(),
def.expireFromMemoryAfterAccess());
}
}

View File

@@ -23,6 +23,9 @@ import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.FluentLogger;
import com.google.common.hash.BloomFilter;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gerrit.common.Nullable;
import com.google.gerrit.server.cache.PersistentCache;
import com.google.gerrit.server.cache.serialize.CacheSerializer;
@@ -40,6 +43,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.util.Calendar;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
@@ -122,7 +126,12 @@ public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements Per
@Override
public V get(K key) throws ExecutionException {
if (mem instanceof LoadingCache) {
return ((LoadingCache<K, ValueHolder<V>>) mem).get(key).value;
LoadingCache<K, ValueHolder<V>> asLoadingCache = (LoadingCache<K, ValueHolder<V>>) mem;
ValueHolder<V> valueHolder = asLoadingCache.get(key);
if (store.needsRefresh(valueHolder.created)) {
asLoadingCache.refresh(key);
}
return valueHolder.value;
}
throw new UnsupportedOperationException();
}
@@ -139,8 +148,8 @@ public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements Per
}
}
ValueHolder<V> h = new ValueHolder<>(valueLoader.call());
h.created = TimeUtil.nowMs();
ValueHolder<V> h =
new ValueHolder<>(valueLoader.call(), Instant.ofEpochMilli(TimeUtil.nowMs()));
executor.execute(() -> store.put(key, h));
return h;
})
@@ -149,8 +158,7 @@ public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements Per
@Override
public void put(K key, V val) {
final ValueHolder<V> h = new ValueHolder<>(val);
h.created = TimeUtil.nowMs();
final ValueHolder<V> h = new ValueHolder<>(val, Instant.ofEpochMilli(TimeUtil.nowMs()));
mem.put(key, h);
executor.execute(() -> store.put(key, h));
}
@@ -217,11 +225,12 @@ public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements Per
static class ValueHolder<V> {
final V value;
long created;
final Instant created;
volatile boolean clean;
ValueHolder(V value) {
ValueHolder(V value, Instant created) {
this.value = value;
this.created = created;
}
}
@@ -248,12 +257,34 @@ public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements Per
}
}
final ValueHolder<V> h = new ValueHolder<>(loader.load(key));
h.created = TimeUtil.nowMs();
final ValueHolder<V> h =
new ValueHolder<>(loader.load(key), Instant.ofEpochMilli(TimeUtil.nowMs()));
executor.execute(() -> store.put(key, h));
return h;
}
}
@Override
public ListenableFuture<ValueHolder<V>> reload(K key, ValueHolder<V> oldValue)
throws Exception {
ListenableFuture<V> reloadedValue = loader.reload(key, oldValue.value);
Futures.addCallback(
reloadedValue,
new FutureCallback<V>() {
@Override
public void onSuccess(V result) {
store.put(key, new ValueHolder<>(result, TimeUtil.now()));
}
@Override
public void onFailure(Throwable t) {
logger.atWarning().withCause(t).log("Unable to reload cache value");
}
},
executor);
return Futures.transform(reloadedValue, v -> new ValueHolder<>(v, TimeUtil.now()), executor);
}
}
static class SqlStore<K, V> {
@@ -263,6 +294,7 @@ public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements Per
private final int version;
private final long maxSize;
@Nullable private final Duration expireAfterWrite;
@Nullable private final Duration refreshAfterWrite;
private final BlockingQueue<SqlHandle> handles;
private final AtomicLong hitCount = new AtomicLong();
private final AtomicLong missCount = new AtomicLong();
@@ -276,13 +308,15 @@ public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements Per
CacheSerializer<V> valueSerializer,
int version,
long maxSize,
@Nullable Duration expireAfterWrite) {
@Nullable Duration expireAfterWrite,
@Nullable Duration refreshAfterWrite) {
this.url = jdbcUrl;
this.keyType = createKeyType(keyType, keySerializer);
this.valueSerializer = valueSerializer;
this.version = version;
this.maxSize = maxSize;
this.expireAfterWrite = expireAfterWrite;
this.refreshAfterWrite = refreshAfterWrite;
int cores = Runtime.getRuntime().availableProcessors();
int keep = Math.min(cores, 16);
@@ -394,14 +428,14 @@ public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements Per
}
Timestamp created = r.getTimestamp(2);
if (expired(created)) {
if (expired(created.toInstant())) {
invalidate(key);
missCount.incrementAndGet();
return null;
}
V val = valueSerializer.deserialize(r.getBytes(1));
ValueHolder<V> h = new ValueHolder<>(val);
ValueHolder<V> h = new ValueHolder<>(val, created.toInstant());
h.clean = true;
hitCount.incrementAndGet();
touch(c, key);
@@ -429,14 +463,22 @@ public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements Per
return false;
}
private boolean expired(Timestamp created) {
private boolean expired(Instant created) {
if (expireAfterWrite == null) {
return false;
}
Duration age = Duration.between(created.toInstant(), TimeUtil.now());
Duration age = Duration.between(created, TimeUtil.now());
return age.compareTo(expireAfterWrite) > 0;
}
private boolean needsRefresh(Instant created) {
if (refreshAfterWrite == null) {
return false;
}
Duration age = Duration.between(created, TimeUtil.now());
return age.compareTo(refreshAfterWrite) > 0;
}
private void touch(SqlHandle c, K key) throws IOException, SQLException {
if (c.touch == null) {
c.touch = c.conn.prepareStatement("UPDATE data SET accessed=? WHERE k=? AND version=?");
@@ -474,7 +516,7 @@ public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements Per
keyType.set(c.put, 1, key);
c.put.setBytes(2, valueSerializer.serialize(holder.value));
c.put.setInt(3, version);
c.put.setTimestamp(4, new Timestamp(holder.created));
c.put.setTimestamp(4, Timestamp.from(holder.created));
c.put.setTimestamp(5, TimeUtil.nowTs());
c.put.executeUpdate();
holder.clean = true;
@@ -560,7 +602,7 @@ public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements Per
while (maxSize < used && r.next()) {
K key = keyType.get(r, 1);
Timestamp created = r.getTimestamp(3);
if (mem.getIfPresent(key) != null && !expired(created)) {
if (mem.getIfPresent(key) != null && !expired(created.toInstant())) {
touch(c, key);
} else {
invalidate(c, key);

View File

@@ -105,6 +105,21 @@ class DefaultMemoryCacheFactory implements MemoryCacheFactory {
builder.expireAfterAccess(expireAfterAccess.toNanos(), NANOSECONDS);
}
Duration refreshAfterWrite = def.refreshAfterWrite();
if (has(def.configKey(), "refreshAfterWrite")) {
builder.refreshAfterWrite(
ConfigUtil.getTimeUnit(
cfg,
"cache",
def.configKey(),
"refreshAfterWrite",
toSeconds(refreshAfterWrite),
SECONDS),
SECONDS);
} else if (refreshAfterWrite != null) {
builder.refreshAfterWrite(refreshAfterWrite.toNanos(), NANOSECONDS);
}
return builder;
}
@@ -141,6 +156,21 @@ class DefaultMemoryCacheFactory implements MemoryCacheFactory {
builder.expireAfterAccess(expireAfterAccess.toNanos(), NANOSECONDS);
}
Duration refreshAfterWrite = def.refreshAfterWrite();
if (has(def.configKey(), "refreshAfterWrite")) {
builder.expireAfterAccess(
ConfigUtil.getTimeUnit(
cfg,
"cache",
def.configKey(),
"refreshAfterWrite",
toSeconds(refreshAfterWrite),
SECONDS),
SECONDS);
} else if (refreshAfterWrite != null) {
builder.refreshAfterWrite(refreshAfterWrite.toNanos(), NANOSECONDS);
}
return builder;
}

View File

@@ -6,10 +6,12 @@ junit_tests(
deps = [
"//java/com/google/gerrit/server/cache/h2",
"//java/com/google/gerrit/server/cache/serialize",
"//java/com/google/gerrit/server/util/time",
"//lib:guava",
"//lib:h2",
"//lib:junit",
"//lib/guice",
"//lib/mockito",
"//lib/truth",
],
)

View File

@@ -16,16 +16,27 @@ package com.google.gerrit.server.cache.h2;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.server.cache.h2.H2CacheImpl.SqlStore;
import com.google.gerrit.server.cache.h2.H2CacheImpl.ValueHolder;
import com.google.gerrit.server.cache.serialize.StringCacheSerializer;
import com.google.gerrit.server.util.time.TimeUtil;
import com.google.inject.TypeLiteral;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.junit.Test;
public class H2CacheTest {
@@ -38,23 +49,31 @@ public class H2CacheTest {
}
private static H2CacheImpl<String, String> newH2CacheImpl(
int id, Cache<String, ValueHolder<String>> mem, int version) {
SqlStore<String, String> store =
new SqlStore<>(
"jdbc:h2:mem:Test_" + id,
KEY_TYPE,
StringCacheSerializer.INSTANCE,
StringCacheSerializer.INSTANCE,
version,
1 << 20,
null);
SqlStore<String, String> store, Cache<String, ValueHolder<String>> mem) {
return new H2CacheImpl<>(MoreExecutors.directExecutor(), store, KEY_TYPE, mem);
}
private static SqlStore<String, String> newStore(
int id,
int version,
@Nullable Duration expireAfterWrite,
@Nullable Duration refreshAfterWrite) {
return new SqlStore<>(
"jdbc:h2:mem:Test_" + id,
KEY_TYPE,
StringCacheSerializer.INSTANCE,
StringCacheSerializer.INSTANCE,
version,
1 << 20,
expireAfterWrite,
refreshAfterWrite);
}
@Test
public void get() throws ExecutionException {
Cache<String, ValueHolder<String>> mem = CacheBuilder.newBuilder().build();
H2CacheImpl<String, String> impl = newH2CacheImpl(nextDbId(), mem, DEFAULT_VERSION);
H2CacheImpl<String, String> impl =
newH2CacheImpl(newStore(nextDbId(), DEFAULT_VERSION, null, null), mem);
assertThat(impl.getIfPresent("foo")).isNull();
@@ -94,11 +113,12 @@ public class H2CacheTest {
}
@Test
public void version() throws Exception {
public void version() {
int id = nextDbId();
H2CacheImpl<String, String> oldImpl = newH2CacheImpl(id, disableMemCache(), DEFAULT_VERSION);
H2CacheImpl<String, String> oldImpl =
newH2CacheImpl(newStore(id, DEFAULT_VERSION, null, null), disableMemCache());
H2CacheImpl<String, String> newImpl =
newH2CacheImpl(id, disableMemCache(), DEFAULT_VERSION + 1);
newH2CacheImpl(newStore(id, DEFAULT_VERSION + 1, null, null), disableMemCache());
assertThat(oldImpl.diskStats().space()).isEqualTo(0);
assertThat(newImpl.diskStats().space()).isEqualTo(0);
@@ -124,6 +144,57 @@ public class H2CacheTest {
assertThat(oldImpl.getIfPresent("key")).isNull();
}
@Test
public void refreshAfterWrite_triggeredWhenConfigured() throws Exception {
SqlStore<String, String> store =
newStore(nextDbId(), DEFAULT_VERSION, null, Duration.ofMillis(10));
// This is the loader that we configure for the cache when calling .loader(...)
@SuppressWarnings("unchecked")
CacheLoader<String, String> baseLoader = (CacheLoader<String, String>) mock(CacheLoader.class);
resetLoaderAndAnswerLoadAndRefreshCalls(baseLoader);
// We wrap baseLoader just like H2CacheFactory is wrapping it. The wrapped version will call out
// to the store for refreshing values.
H2CacheImpl.Loader<String, String> wrappedLoader =
new H2CacheImpl.Loader<>(MoreExecutors.directExecutor(), store, baseLoader);
// memCache is the in-memory variant of the cache. Its loader is wrappedLoader which will call
// out to the store to save or delete cached values.
LoadingCache<String, ValueHolder<String>> memCache =
CacheBuilder.newBuilder().maximumSize(10).build(wrappedLoader);
// h2Cache puts it all together
H2CacheImpl<String, String> h2Cache = newH2CacheImpl(store, memCache);
// Initial load and cache retrieval do not trigger refresh
// This works because we use a directExecutor() for refreshes
TimeUtil.setCurrentMillisSupplier(() -> 0);
assertThat(h2Cache.get("foo")).isEqualTo("load:foo");
verify(baseLoader).load("foo");
assertThat(h2Cache.get("foo")).isEqualTo("load:foo");
verifyNoMoreInteractions(baseLoader);
resetLoaderAndAnswerLoadAndRefreshCalls(baseLoader);
// Load after refresh duration returns old value, triggers refresh and returns new value
TimeUtil.setCurrentMillisSupplier(() -> 11);
assertThat(h2Cache.get("foo")).isEqualTo("load:foo");
assertThat(h2Cache.get("foo")).isEqualTo("reload:foo");
verify(baseLoader).reload("foo", "load:foo");
verifyNoMoreInteractions(baseLoader);
resetLoaderAndAnswerLoadAndRefreshCalls(baseLoader);
// Refreshed value was persisted
memCache.invalidateAll(); // Invalidates only the memcache, not the store.
assertThat(h2Cache.getIfPresent("foo")).isEqualTo("reload:foo");
}
private static void resetLoaderAndAnswerLoadAndRefreshCalls(CacheLoader<String, String> loader)
throws Exception {
reset(loader);
when(loader.load("foo")).thenReturn("load:foo");
when(loader.reload("foo", "load:foo")).thenReturn(Futures.immediateFuture("reload:foo"));
}
private static <K, V> Cache<K, ValueHolder<V>> disableMemCache() {
return CacheBuilder.newBuilder().maximumSize(0).build();
}