Fix memory leak of SubIndex.NrtFuture objects

The SubIndex.NrtFuture objects are added as listeners of
searchManager who was found to hold on to them forever.

Fixed. There are also some code refactoring in NrtFuture.

Change-Id: If87cb62890a1cfa6c6336f6c7953a1cb56d42937
This commit is contained in:
Bruce Zu
2014-03-28 10:40:50 +08:00
parent 902686a8d8
commit 4174e56a9a

View File

@@ -16,7 +16,7 @@ package com.google.gerrit.lucene;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import com.google.common.collect.Maps; import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -38,13 +38,12 @@ import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ConcurrentMap; import java.util.Set;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
/** Piece of the change index that is implemented as a separate Lucene index. */ /** Piece of the change index that is implemented as a separate Lucene index. */
class SubIndex { class SubIndex {
@@ -54,7 +53,7 @@ class SubIndex {
private final TrackingIndexWriter writer; private final TrackingIndexWriter writer;
private final SearcherManager searcherManager; private final SearcherManager searcherManager;
private final ControlledRealTimeReopenThread<IndexSearcher> reopenThread; private final ControlledRealTimeReopenThread<IndexSearcher> reopenThread;
private final ConcurrentMap<RefreshListener, Boolean> refreshListeners; private final Set<NrtFuture> notDoneNrtFutures;
SubIndex(File file, GerritIndexWriterConfig writerConfig) throws IOException { SubIndex(File file, GerritIndexWriterConfig writerConfig) throws IOException {
this(FSDirectory.open(file), file.getName(), writerConfig); this(FSDirectory.open(file), file.getName(), writerConfig);
@@ -106,7 +105,7 @@ class SubIndex {
searcherManager = new SearcherManager( searcherManager = new SearcherManager(
writer.getIndexWriter(), true, new SearcherFactory()); writer.getIndexWriter(), true, new SearcherFactory());
refreshListeners = Maps.newConcurrentMap(); notDoneNrtFutures = Sets.newConcurrentHashSet();
searcherManager.addListener(new RefreshListener() { searcherManager.addListener(new RefreshListener() {
@Override @Override
public void beforeRefresh() throws IOException { public void beforeRefresh() throws IOException {
@@ -114,8 +113,8 @@ class SubIndex {
@Override @Override
public void afterRefresh(boolean didRefresh) throws IOException { public void afterRefresh(boolean didRefresh) throws IOException {
for (RefreshListener l : refreshListeners.keySet()) { for (NrtFuture f : notDoneNrtFutures) {
l.afterRefresh(didRefresh); f.removeIfDone();
} }
} }
}); });
@@ -171,10 +170,8 @@ class SubIndex {
searcherManager.release(searcher); searcherManager.release(searcher);
} }
private final class NrtFuture extends AbstractFuture<Void> private final class NrtFuture extends AbstractFuture<Void> {
implements RefreshListener {
private final long gen; private final long gen;
private final AtomicBoolean hasListeners = new AtomicBoolean();
NrtFuture(long gen) { NrtFuture(long gen) {
this.gen = gen; this.gen = gen;
@@ -193,9 +190,12 @@ class SubIndex {
public Void get(long timeout, TimeUnit unit) throws InterruptedException, public Void get(long timeout, TimeUnit unit) throws InterruptedException,
TimeoutException, ExecutionException { TimeoutException, ExecutionException {
if (!isDone()) { if (!isDone()) {
reopenThread.waitForGeneration(gen, if (reopenThread.waitForGeneration(gen,
(int) MILLISECONDS.convert(timeout, unit)); (int) MILLISECONDS.convert(timeout, unit))) {
set(null); set(null);
} else {
throw new TimeoutException();
}
} }
return super.get(timeout, unit); return super.get(timeout, unit);
} }
@@ -204,7 +204,7 @@ class SubIndex {
public boolean isDone() { public boolean isDone() {
if (super.isDone()) { if (super.isDone()) {
return true; return true;
} else if (isSearcherCurrent()) { } else if (isGenAvailableNowForCurrentSearcher()) {
set(null); set(null);
return true; return true;
} }
@@ -213,33 +213,31 @@ class SubIndex {
@Override @Override
public void addListener(Runnable listener, Executor executor) { public void addListener(Runnable listener, Executor executor) {
if (hasListeners.compareAndSet(false, true) && !isDone()) { if (!isDone()) {
searcherManager.addListener(this); notDoneNrtFutures.add(this);
} }
super.addListener(listener, executor); super.addListener(listener, executor);
} }
@Override @Override
public boolean cancel(boolean mayInterruptIfRunning) { public boolean cancel(boolean mayInterruptIfRunning) {
if (hasListeners.get()) { boolean result = super.cancel(mayInterruptIfRunning);
refreshListeners.put(this, true); if (result) {
notDoneNrtFutures.remove(this);
} }
return super.cancel(mayInterruptIfRunning); return result;
} }
@Override void removeIfDone() {
public void beforeRefresh() throws IOException { if (isGenAvailableNowForCurrentSearcher()) {
} notDoneNrtFutures.remove(this);
if (!isCancelled()) {
@Override set(null);
public void afterRefresh(boolean didRefresh) throws IOException { }
if (isSearcherCurrent()) {
refreshListeners.remove(this);
set(null);
} }
} }
private boolean isSearcherCurrent() { private boolean isGenAvailableNowForCurrentSearcher() {
try { try {
return reopenThread.waitForGeneration(gen, 0); return reopenThread.waitForGeneration(gen, 0);
} catch (InterruptedException e) { } catch (InterruptedException e) {