ChangeIndexer: Set request contexts more correctly

For the non-async calls, don't use Task, as it's executing in the same
thread so doesn't need to open a new ReviewDb and set the
RequestContext.

Delete the async calls that take ChangeData (which were unused).
ChangeData's internal data structures are very much not threadsafe.
This means we might be duplicating some work if the caller had
available a partially-populated ChangeData, but not a whole lot, and
it's async, so slower is less worrisome (and besides, these methods
were unused),

In the future we might inject a ReviewDb into ChangeData at creation
time instead of passing it in to practically every method; that would
make it so ChangeData must always be constructed from the thread where
it's used, in this case inside Task.call().

Change-Id: Ic292cdd6ac2e81a7450626402d5eb4ce6b4c3083
This commit is contained in:
Dave Borowitz
2013-12-20 14:37:58 -08:00
parent 4a5dffbe0b
commit 24b40a9592

View File

@@ -38,6 +38,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
@@ -109,18 +111,8 @@ public class ChangeIndexer {
* @return future for the indexing task.
*/
public CheckedFuture<?, IOException> indexAsync(Change change) {
return indexAsync(new ChangeData(change));
}
/**
* Start indexing a change.
*
* @param cd change to index.
* @return future for the indexing task.
*/
public CheckedFuture<?, IOException> indexAsync(ChangeData cd) {
return executor != null
? submit(new Task(cd, false))
? submit(new Task(new ChangeData(change), false))
: Futures.<Object, IOException> immediateCheckedFuture(null);
}
@@ -139,12 +131,8 @@ public class ChangeIndexer {
* @param cd change to index.
*/
public void index(ChangeData cd) throws IOException {
try {
new Task(cd, false).call();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw MAPPER.apply(e);
for (ChangeIndex i : getWriteIndexes()) {
i.replace(cd);
}
}
@@ -155,18 +143,8 @@ public class ChangeIndexer {
* @return future for the deleting task.
*/
public CheckedFuture<?, IOException> deleteAsync(Change change) {
return deleteAsync(new ChangeData(change));
}
/**
* Start deleting a change.
*
* @param cd change to delete.
* @return future for the deleting task.
*/
public CheckedFuture<?, IOException> deleteAsync(ChangeData cd) {
return executor != null
? submit(new Task(cd, true))
? submit(new Task(new ChangeData(change), true))
: Futures.<Object, IOException> immediateCheckedFuture(null);
}
@@ -185,15 +163,17 @@ public class ChangeIndexer {
* @param cd change to delete.
*/
public void delete(ChangeData cd) throws IOException {
try {
new Task(cd, true).call();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw MAPPER.apply(e);
for (ChangeIndex i : getWriteIndexes()) {
i.delete(cd);
}
}
private Collection<ChangeIndex> getWriteIndexes() {
return indexes != null
? indexes.getWriteIndexes()
: Collections.singleton(index);
}
private CheckedFuture<?, IOException> submit(Callable<?> task) {
return Futures.makeChecked(executor.submit(task), MAPPER);
}
@@ -236,12 +216,14 @@ public class ChangeIndexer {
}
});
try {
if (indexes != null) {
for (ChangeIndex i : indexes.getWriteIndexes()) {
apply(i, cd);
if (delete) {
for (ChangeIndex i : getWriteIndexes()) {
i.delete(cd);
}
} else {
apply(index, cd);
for (ChangeIndex i : getWriteIndexes()) {
i.replace(cd);
}
}
return null;
} finally {
@@ -259,14 +241,6 @@ public class ChangeIndexer {
}
}
private void apply(ChangeIndex i, ChangeData cd) throws IOException {
if (delete) {
i.delete(cd);
} else {
i.replace(cd);
}
}
@Override
public String toString() {
return "index-change-" + cd.getId().get();