Don't return futures from ChangeIndex methods

In I1b3c5ba0 ChangeIndex.replace was modified to return a future, but
ChangeIndexer.Task was not changed to call those futures before
returning.

In retrospect this was a mistake anyway. The only path by which these
methods are currently called is from ChangeIndexerImpl, which turns
around and wraps them in a Callable and submits them to the same
executor, tying up twice as many threads as necessary.

Similarly, to avoid additional fanout in ChangeIndexerImpl, we should
not write to multiple index versions in parallel, so remove the TODO
to try that. The current behavior slows down individual writes for the
relatively short period of an online version upgrade, but that is
acceptable given the improved thread utilization overall.

Change-Id: I5fff470214ecd69a261dec1c10b9f7bdd0a1907e
This commit is contained in:
Dave Borowitz
2013-07-10 15:29:19 -07:00
parent 3d271c0c66
commit fc1bba54fd
7 changed files with 66 additions and 67 deletions

View File

@@ -18,7 +18,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.gerrit.server.index.IndexRewriteImpl.CLOSED_STATUSES;
import static com.google.gerrit.server.index.IndexRewriteImpl.OPEN_STATUSES;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
@@ -90,6 +89,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
/**
* Secondary index implementation using Apache Lucene.
@@ -203,54 +203,61 @@ public class LuceneChangeIndex implements ChangeIndex {
@SuppressWarnings("unchecked")
@Override
public ListenableFuture<Void> insert(ChangeData cd) throws IOException {
public void insert(ChangeData cd) throws IOException {
Term id = QueryBuilder.idTerm(cd);
Document doc = toDocument(cd);
try {
if (cd.getChange().getStatus().isOpen()) {
return allOf(
Futures.allAsList(
closedIndex.delete(id),
openIndex.insert(doc));
openIndex.insert(doc)).get();
} else {
return allOf(
Futures.allAsList(
openIndex.delete(id),
closedIndex.insert(doc));
closedIndex.insert(doc)).get();
}
} catch (ExecutionException e) {
throw new IOException(e);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
@SuppressWarnings("unchecked")
@Override
public ListenableFuture<Void> replace(ChangeData cd) throws IOException {
public void replace(ChangeData cd) throws IOException {
Term id = QueryBuilder.idTerm(cd);
Document doc = toDocument(cd);
try {
if (cd.getChange().getStatus().isOpen()) {
return allOf(
Futures.allAsList(
closedIndex.delete(id),
openIndex.replace(id, doc));
openIndex.replace(id, doc)).get();
} else {
return allOf(
Futures.allAsList(
openIndex.delete(id),
closedIndex.replace(id, doc));
closedIndex.replace(id, doc)).get();
}
} catch (ExecutionException e) {
throw new IOException(e);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
@SuppressWarnings("unchecked")
@Override
public ListenableFuture<Void> delete(ChangeData cd) throws IOException {
public void delete(ChangeData cd) throws IOException {
Term id = QueryBuilder.idTerm(cd);
return allOf(
try {
Futures.allAsList(
openIndex.delete(id),
closedIndex.delete(id));
closedIndex.delete(id)).get();
} catch (ExecutionException e) {
throw new IOException(e);
} catch (InterruptedException e) {
throw new IOException(e);
}
private static <V> ListenableFuture<Void> allOf(ListenableFuture<V>... f) {
return Futures.transform(
Futures.allAsList(f),
new Function<List<V>, Void>() {
@Override
public Void apply(List<V> input) {
return null;
}
});
}
@Override

View File

@@ -99,15 +99,15 @@ class SubIndex {
}
}
ListenableFuture<Void> insert(Document doc) throws IOException {
ListenableFuture<?> insert(Document doc) throws IOException {
return new NrtFuture(writer.addDocument(doc));
}
ListenableFuture<Void> replace(Term term, Document doc) throws IOException {
ListenableFuture<?> replace(Term term, Document doc) throws IOException {
return new NrtFuture(writer.updateDocument(term, doc));
}
ListenableFuture<Void> delete(Term term) throws IOException {
ListenableFuture<?> delete(Term term) throws IOException {
return new NrtFuture(writer.deleteDocuments(term));
}

View File

@@ -14,8 +14,6 @@
package com.google.gerrit.server.index;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gerrit.server.query.Predicate;
import com.google.gerrit.server.query.QueryParseException;
import com.google.gerrit.server.query.change.ChangeData;
@@ -42,18 +40,18 @@ public interface ChangeIndex {
}
@Override
public ListenableFuture<Void> insert(ChangeData cd) throws IOException {
return Futures.immediateFuture(null);
public void insert(ChangeData cd) throws IOException {
// Do nothing.
}
@Override
public ListenableFuture<Void> replace(ChangeData cd) throws IOException {
return Futures.immediateFuture(null);
public void replace(ChangeData cd) throws IOException {
// Do nothing.
}
@Override
public ListenableFuture<Void> delete(ChangeData cd) throws IOException {
return Futures.immediateFuture(null);
public void delete(ChangeData cd) throws IOException {
// Do nothing.
}
@Override
@@ -93,7 +91,7 @@ public interface ChangeIndex {
*
* @throws IOException if the change could not be inserted.
*/
public ListenableFuture<Void> insert(ChangeData cd) throws IOException;
public void insert(ChangeData cd) throws IOException;
/**
* Update a change document in the index.
@@ -106,7 +104,7 @@ public interface ChangeIndex {
*
* @throws IOException
*/
public ListenableFuture<Void> replace(ChangeData cd) throws IOException;
public void replace(ChangeData cd) throws IOException;
/**
* Delete a change document from the index.
@@ -115,7 +113,7 @@ public interface ChangeIndex {
*
* @throws IOException
*/
public ListenableFuture<Void> delete(ChangeData cd) throws IOException;
public void delete(ChangeData cd) throws IOException;
/**
* Delete all change documents from the index.

View File

@@ -42,7 +42,7 @@ public abstract class ChangeIndexer {
}
@Override
public Callable<Void> indexTask(ChangeData cd) {
public Callable<?> indexTask(ChangeData cd) {
return new Callable<Void>() {
@Override
public Void call() {
@@ -52,7 +52,7 @@ public abstract class ChangeIndexer {
}
@Override
public Callable<Void> deleteTask(ChangeData cd) {
public Callable<?> deleteTask(ChangeData cd) {
return new Callable<Void>() {
@Override
public Void call() {
@@ -81,7 +81,7 @@ public abstract class ChangeIndexer {
/**
* Start indexing a change.
*
* @param change change to index.
* @param cd change to index.
* @return future for the indexing task.
*/
public ListenableFuture<?> index(ChangeData cd) {
@@ -96,7 +96,7 @@ public abstract class ChangeIndexer {
* @param cd change to index.
* @return unstarted runnable to index the change.
*/
public abstract Callable<Void> indexTask(ChangeData cd);
public abstract Callable<?> indexTask(ChangeData cd);
/**
* Start deleting a change.
@@ -111,7 +111,7 @@ public abstract class ChangeIndexer {
/**
* Start deleting a change.
*
* @param change change to delete.
* @param cd change to delete.
* @return future for the deleting task.
*/
public ListenableFuture<?> delete(ChangeData cd) {
@@ -126,5 +126,5 @@ public abstract class ChangeIndexer {
* @param cd change to delete.
* @return unstarted runnable to delete the change.
*/
public abstract Callable<Void> deleteTask(ChangeData cd);
public abstract Callable<?> deleteTask(ChangeData cd);
}

View File

@@ -109,7 +109,7 @@ public class ChangeIndexerImpl extends ChangeIndexer {
});
if (indexes != null) {
for (ChangeIndex i : indexes.getWriteIndexes()) {
apply(i, cd); // TODO(dborowitz): Parallelize these
apply(i, cd);
}
} else {
apply(index, cd);

View File

@@ -15,7 +15,6 @@
package com.google.gerrit.server.index;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gerrit.server.query.Predicate;
import com.google.gerrit.server.query.QueryParseException;
import com.google.gerrit.server.query.change.ChangeData;
@@ -69,17 +68,17 @@ class FakeIndex implements ChangeIndex {
}
@Override
public ListenableFuture<Void> insert(ChangeData cd) {
public void insert(ChangeData cd) {
throw new UnsupportedOperationException();
}
@Override
public ListenableFuture<Void> replace(ChangeData cd) {
public void replace(ChangeData cd) {
throw new UnsupportedOperationException();
}
@Override
public ListenableFuture<Void> delete(ChangeData cd) {
public void delete(ChangeData cd) {
throw new UnsupportedOperationException();
}

View File

@@ -22,8 +22,6 @@ import static com.google.gerrit.solr.IndexVersionCheck.solrIndexConfig;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.lucene.QueryBuilder;
import com.google.gerrit.reviewdb.client.Change;
@@ -126,7 +124,7 @@ class SolrChangeIndex implements ChangeIndex, LifecycleListener {
}
@Override
public ListenableFuture<Void> insert(ChangeData cd) throws IOException {
public void insert(ChangeData cd) throws IOException {
String id = cd.getId().toString();
SolrInputDocument doc = toDocument(cd);
try {
@@ -142,11 +140,10 @@ class SolrChangeIndex implements ChangeIndex, LifecycleListener {
}
commit(openIndex);
commit(closedIndex);
return Futures.immediateFuture(null);
}
@Override
public ListenableFuture<Void> replace(ChangeData cd) throws IOException {
public void replace(ChangeData cd) throws IOException {
String id = cd.getId().toString();
SolrInputDocument doc = toDocument(cd);
try {
@@ -162,11 +159,10 @@ class SolrChangeIndex implements ChangeIndex, LifecycleListener {
}
commit(openIndex);
commit(closedIndex);
return Futures.immediateFuture(null);
}
@Override
public ListenableFuture<Void> delete(ChangeData cd) throws IOException {
public void delete(ChangeData cd) throws IOException {
String id = cd.getId().toString();
try {
if (cd.getChange().getStatus().isOpen()) {
@@ -176,7 +172,6 @@ class SolrChangeIndex implements ChangeIndex, LifecycleListener {
closedIndex.deleteById(id);
commit(closedIndex);
}
return Futures.immediateFuture(null);
} catch (SolrServerException e) {
throw new IOException(e);
}