diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/index/change/ChangeIndexer.java b/gerrit-server/src/main/java/com/google/gerrit/server/index/change/ChangeIndexer.java index f1a7e859cd..6d1971e2d4 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/index/change/ChangeIndexer.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/index/change/ChangeIndexer.java @@ -18,6 +18,7 @@ import static com.google.gerrit.server.extensions.events.EventUtil.logEventListe import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH; import com.google.common.base.Function; +import com.google.common.base.Objects; import com.google.common.util.concurrent.Atomics; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -50,7 +51,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; @@ -111,6 +114,11 @@ public class ChangeIndexer { private final StalenessChecker stalenessChecker; private final boolean autoReindexIfStale; + private final Set queuedIndexTasks = + Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set queuedReindexIfStaleTasks = + Collections.newSetFromMap(new ConcurrentHashMap<>()); + @AssistedInject ChangeIndexer( @GerritServerConfig Config cfg, @@ -178,7 +186,11 @@ public class ChangeIndexer { @SuppressWarnings("deprecation") public com.google.common.util.concurrent.CheckedFuture indexAsync( Project.NameKey project, Change.Id id) { - return submit(new IndexTask(project, id)); + IndexTask task = new IndexTask(project, id); + if (queuedIndexTasks.add(task)) { + return submit(task); + } + return Futures.immediateCheckedFuture(null); } /** @@ -309,7 +321,11 @@ public class ChangeIndexer { @SuppressWarnings("deprecation") public com.google.common.util.concurrent.CheckedFuture reindexIfStale( Project.NameKey project, Change.Id id) { - return submit(new ReindexIfStaleTask(project, id), batchExecutor); + ReindexIfStaleTask task = new ReindexIfStaleTask(project, id); + if (queuedReindexIfStaleTasks.add(task)) { + return submit(task, batchExecutor); + } + return Futures.immediateCheckedFuture(false); } private void autoReindexIfStale(ChangeData cd) { @@ -351,6 +367,8 @@ public class ChangeIndexer { protected abstract T callImpl(Provider db) throws Exception; + protected abstract void remove(); + @Override public abstract String toString(); @@ -405,15 +423,35 @@ public class ChangeIndexer { @Override public Void callImpl(Provider db) throws Exception { + remove(); ChangeData cd = newChangeData(db.get(), project, id); index(cd); return null; } + @Override + public int hashCode() { + return Objects.hashCode(IndexTask.class, id.get()); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof IndexTask)) { + return false; + } + IndexTask other = (IndexTask) obj; + return id.get() == other.id.get(); + } + @Override public String toString() { return "index-change-" + id; } + + @Override + protected void remove() { + queuedIndexTasks.remove(this); + } } // Not AbstractIndexTask as it doesn't need ReviewDb. @@ -445,6 +483,7 @@ public class ChangeIndexer { @Override public Boolean callImpl(Provider db) throws Exception { + remove(); try { if (stalenessChecker.isStale(id)) { index(newChangeData(db.get(), project, id)); @@ -464,10 +503,29 @@ public class ChangeIndexer { return false; } + @Override + public int hashCode() { + return Objects.hashCode(ReindexIfStaleTask.class, id.get()); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof ReindexIfStaleTask)) { + return false; + } + ReindexIfStaleTask other = (ReindexIfStaleTask) obj; + return id.get() == other.id.get(); + } + @Override public String toString() { return "reindex-if-stale-change-" + id; } + + @Override + protected void remove() { + queuedReindexIfStaleTasks.remove(this); + } } private boolean isCausedByRepositoryNotFoundException(Throwable throwable) { diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/index/change/ReindexAfterRefUpdate.java b/gerrit-server/src/main/java/com/google/gerrit/server/index/change/ReindexAfterRefUpdate.java index a2a4507cc9..a59557f375 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/index/change/ReindexAfterRefUpdate.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/index/change/ReindexAfterRefUpdate.java @@ -17,6 +17,7 @@ package com.google.gerrit.server.index.change; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static com.google.gerrit.server.query.change.ChangeData.asChanges; +import com.google.common.base.Objects; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListeningExecutorService; @@ -42,8 +43,11 @@ import com.google.gwtorm.server.OrmException; import com.google.inject.Inject; import com.google.inject.Provider; import java.io.IOException; +import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import org.eclipse.jgit.lib.Config; import org.slf4j.Logger; @@ -62,6 +66,8 @@ public class ReindexAfterRefUpdate implements GitReferenceUpdatedListener { private final ListeningExecutorService executor; private final boolean enabled; + private final Set queuedIndexTasks = Collections.newSetFromMap(new ConcurrentHashMap<>()); + @Inject ReindexAfterRefUpdate( @GerritServerConfig Config cfg, @@ -109,9 +115,12 @@ public class ReindexAfterRefUpdate implements GitReferenceUpdatedListener { @Override public void onSuccess(List changes) { for (Change c : changes) { - // Don't retry indefinitely; if this fails changes may be stale. - @SuppressWarnings("unused") - Future possiblyIgnoredError = executor.submit(new Index(event, c.getId())); + Index task = new Index(event, c.getId()); + if (queuedIndexTasks.add(task)) { + // Don't retry indefinitely; if this fails changes may be stale. + @SuppressWarnings("unused") + Future possiblyIgnoredError = executor.submit(task); + } } } @@ -141,6 +150,8 @@ public class ReindexAfterRefUpdate implements GitReferenceUpdatedListener { } protected abstract V impl(RequestContext ctx) throws Exception; + + protected abstract void remove(); } private class GetChanges extends Task> { @@ -165,6 +176,9 @@ public class ReindexAfterRefUpdate implements GitReferenceUpdatedListener { + " update of project " + event.getProjectName(); } + + @Override + protected void remove() {} } private class Index extends Task { @@ -179,6 +193,7 @@ public class ReindexAfterRefUpdate implements GitReferenceUpdatedListener { protected Void impl(RequestContext ctx) throws OrmException, IOException { // Reload change, as some time may have passed since GetChanges. ReviewDb db = ctx.getReviewDbProvider().get(); + remove(); try { Change c = notesFactory @@ -191,9 +206,28 @@ public class ReindexAfterRefUpdate implements GitReferenceUpdatedListener { return null; } + @Override + public int hashCode() { + return Objects.hashCode(Index.class, id.get()); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof Index)) { + return false; + } + Index other = (Index) obj; + return id.get() == other.id.get(); + } + @Override public String toString() { return "Index change " + id.get() + " of project " + event.getProjectName(); } + + @Override + protected void remove() { + queuedIndexTasks.remove(this); + } } }