Callable: Replace classes with lambda expression

In Java 8 Callable is functional interface and can be replaced with
lambda expressions.

Change-Id: I32907f32c9dc3f394da64f22c384e8f3c33670d4
This commit is contained in:
David Ostrovsky
2017-03-26 21:03:32 +02:00
parent baf902fae0
commit bfa7a2f0a7
15 changed files with 160 additions and 276 deletions

View File

@@ -45,7 +45,6 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.file.Paths;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -170,24 +169,17 @@ public class GerritServer {
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError =
daemonService.submit(
new Callable<Void>() {
@Override
public Void call() throws Exception {
() -> {
int rc =
daemon.main(
new String[] {
"-d",
site.getPath(),
"--headless",
"--console-log",
"--show-stack-trace",
"-d", site.getPath(), "--headless", "--console-log", "--show-stack-trace",
});
if (rc != 0) {
System.err.println("Failed to start Gerrit daemon");
serverStarted.reset();
}
return null;
}
});
serverStarted.await();
System.out.println("Gerrit Server Started");

View File

@@ -131,7 +131,22 @@ public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements Per
@Override
public V get(K key, Callable<? extends V> valueLoader) throws ExecutionException {
return mem.get(key, new LoadingCallable(key, valueLoader)).value;
return mem.get(
key,
() -> {
if (store.mightContain(key)) {
ValueHolder<V> h = store.getIfPresent(key);
if (h != null) {
return h;
}
}
ValueHolder<V> h = new ValueHolder<>(valueLoader.call());
h.created = TimeUtil.nowMs();
executor.execute(() -> store.put(key, h));
return h;
})
.value;
}
@Override
@@ -239,31 +254,6 @@ public class H2CacheImpl<K, V> extends AbstractLoadingCache<K, V> implements Per
}
}
private class LoadingCallable implements Callable<ValueHolder<V>> {
private final K key;
private final Callable<? extends V> loader;
LoadingCallable(K key, Callable<? extends V> loader) {
this.key = key;
this.loader = loader;
}
@Override
public ValueHolder<V> call() throws Exception {
if (store.mightContain(key)) {
ValueHolder<V> h = store.getIfPresent(key);
if (h != null) {
return h;
}
}
final ValueHolder<V> h = new ValueHolder<>(loader.call());
h.created = TimeUtil.nowMs();
executor.execute(() -> store.put(key, h));
return h;
}
}
private static class KeyType<K> {
String columnType() {
return "OTHER";

View File

@@ -24,7 +24,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.server.cache.h2.H2CacheImpl.SqlStore;
import com.google.gerrit.server.cache.h2.H2CacheImpl.ValueHolder;
import com.google.inject.TypeLiteral;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Before;
@@ -54,12 +53,9 @@ public class H2CacheTest {
assertTrue(
impl.get(
"foo",
new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
() -> {
called.set(true);
return true;
}
}));
assertTrue("used Callable", called.get());
assertTrue("exists in cache", impl.getIfPresent("foo"));
@@ -70,12 +66,9 @@ public class H2CacheTest {
assertTrue(
impl.get(
"foo",
new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
() -> {
called.set(true);
return true;
}
}));
assertFalse("did not invoke Callable", called.get());
}

View File

@@ -20,7 +20,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -245,47 +244,26 @@ public abstract class AbstractLuceneIndex<K, V> implements Index<K, V> {
}
ListenableFuture<?> insert(final Document doc) {
return submit(
new Callable<Long>() {
@Override
public Long call() throws IOException, InterruptedException {
return writer.addDocument(doc);
}
});
return submit(() -> writer.addDocument(doc));
}
ListenableFuture<?> replace(final Term term, final Document doc) {
return submit(
new Callable<Long>() {
@Override
public Long call() throws IOException, InterruptedException {
return writer.updateDocument(term, doc);
}
});
return submit(() -> writer.updateDocument(term, doc));
}
ListenableFuture<?> delete(final Term term) {
return submit(
new Callable<Long>() {
@Override
public Long call() throws IOException, InterruptedException {
return writer.deleteDocuments(term);
}
});
return submit(() -> writer.deleteDocuments(term));
}
private ListenableFuture<?> submit(Callable<Long> task) {
ListenableFuture<Long> future = Futures.nonCancellationPropagating(writerThread.submit(task));
return Futures.transformAsync(
future,
new AsyncFunction<Long, Void>() {
@Override
public ListenableFuture<Void> apply(Long gen) throws InterruptedException {
gen -> {
// Tell the reopen thread a future is waiting on this
// generation so it uses the min stale time when refreshing.
reopenThread.waitForGeneration(gen, 0);
return new NrtFuture(gen);
}
},
directExecutor());
}

View File

@@ -66,7 +66,6 @@ import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.eclipse.jgit.lib.BatchRefUpdate;
@@ -155,16 +154,13 @@ public class RebuildNoteDb extends SiteProgram {
for (final Project.NameKey project : projectNames) {
ListenableFuture<Boolean> future =
executor.submit(
new Callable<Boolean>() {
@Override
public Boolean call() {
() -> {
try (ReviewDb db = unwrapDb(schemaFactory.open())) {
return rebuildProject(db, changesByProject, project, allUsersRepo);
} catch (Exception e) {
log.error("Error rebuilding project " + project, e);
return false;
}
}
});
futures.add(future);
}

View File

@@ -56,7 +56,6 @@ import com.google.inject.Singleton;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.eclipse.jgit.errors.ConfigInvalidException;
@@ -623,7 +622,17 @@ public class ExternalIdsUpdate {
try (Repository repo = repoManager.openRepository(allUsersName);
RevWalk rw = new RevWalk(repo);
ObjectInserter ins = repo.newObjectInserter()) {
return retryer.call(new TryNoteMapUpdate(repo, rw, ins, update));
return retryer.call(
() -> {
ObjectId rev = readRevision(repo);
afterReadRevision.run();
NoteMap noteMap = readNoteMap(rw, rev);
update.accept(OpenRepo.create(repo, rw, ins, noteMap));
return commit(repo, rw, ins, rev, noteMap);
});
} catch (ExecutionException | RetryException e) {
if (e.getCause() != null) {
Throwables.throwIfInstanceOf(e.getCause(), IOException.class);
@@ -734,31 +743,4 @@ public class ExternalIdsUpdate {
abstract ObjectId newRev();
}
private class TryNoteMapUpdate implements Callable<RefsMetaExternalIdsUpdate> {
private final Repository repo;
private final RevWalk rw;
private final ObjectInserter ins;
private final MyConsumer<OpenRepo> update;
private TryNoteMapUpdate(
Repository repo, RevWalk rw, ObjectInserter ins, MyConsumer<OpenRepo> update) {
this.repo = repo;
this.rw = rw;
this.ins = ins;
this.update = update;
}
@Override
public RefsMetaExternalIdsUpdate call() throws Exception {
ObjectId rev = readRevision(repo);
afterReadRevision.run();
NoteMap noteMap = readNoteMap(rw, rev);
update.accept(OpenRepo.create(repo, rw, ins, noteMap));
return commit(repo, rw, ins, rev, noteMap);
}
}
}

View File

@@ -31,9 +31,7 @@ import com.google.gerrit.reviewdb.client.Branch;
import com.google.gerrit.server.cache.CacheModule;
import com.google.gerrit.server.git.CodeReviewCommit;
import com.google.gerrit.server.git.CodeReviewCommit.CodeReviewRevWalk;
import com.google.gerrit.server.git.IntegrationException;
import com.google.gerrit.server.git.strategy.SubmitDryRun;
import com.google.gerrit.server.project.NoSuchProjectException;
import com.google.inject.Inject;
import com.google.inject.Module;
import com.google.inject.Singleton;
@@ -45,7 +43,6 @@ import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Ref;
@@ -173,31 +170,6 @@ public class MergeabilityCacheImpl implements MergeabilityCache {
}
}
private class Loader implements Callable<Boolean> {
private final EntryKey key;
private final Branch.NameKey dest;
private final Repository repo;
Loader(EntryKey key, Branch.NameKey dest, Repository repo) {
this.key = key;
this.dest = dest;
this.repo = repo;
}
@Override
public Boolean call() throws NoSuchProjectException, IntegrationException, IOException {
if (key.into.equals(ObjectId.zeroId())) {
return true; // Assume yes on new branch.
}
try (CodeReviewRevWalk rw = CodeReviewCommit.newRevWalk(repo)) {
Set<RevCommit> accepted = SubmitDryRun.getAlreadyAccepted(repo, rw);
accepted.add(rw.parseCommit(key.into));
accepted.addAll(Arrays.asList(rw.parseCommit(key.commit).getParents()));
return submitDryRun.run(key.submitType, repo, rw, dest, key.into, key.commit, accepted);
}
}
}
public static class MergeabilityWeigher implements Weigher<EntryKey, Boolean> {
@Override
public int weigh(EntryKey k, Boolean v) {
@@ -229,7 +201,20 @@ public class MergeabilityCacheImpl implements MergeabilityCache {
ObjectId into = intoRef != null ? intoRef.getObjectId() : ObjectId.zeroId();
EntryKey key = new EntryKey(commit, into, submitType, mergeStrategy);
try {
return cache.get(key, new Loader(key, dest, repo));
return cache.get(
key,
() -> {
if (key.into.equals(ObjectId.zeroId())) {
return true; // Assume yes on new branch.
}
try (CodeReviewRevWalk rw = CodeReviewCommit.newRevWalk(repo)) {
Set<RevCommit> accepted = SubmitDryRun.getAlreadyAccepted(repo, rw);
accepted.add(rw.parseCommit(key.into));
accepted.addAll(Arrays.asList(rw.parseCommit(key.commit).getParents()));
return submitDryRun.run(
key.submitType, repo, rw, dest, key.into, key.commit, accepted);
}
});
} catch (ExecutionException | UncheckedExecutionException e) {
log.error(
String.format(

View File

@@ -64,11 +64,9 @@ public class PerThreadRequestScope {
}
public <T> Callable<T> scope(RequestContext requestContext, Callable<T> callable) {
final Context ctx = new Context();
final Callable<T> wrapped = context(requestContext, cleanup(callable));
return new Callable<T>() {
@Override
public T call() throws Exception {
Context ctx = new Context();
Callable<T> wrapped = context(requestContext, cleanup(callable));
return () -> {
Context old = current.get();
current.set(ctx);
try {
@@ -76,7 +74,6 @@ public class PerThreadRequestScope {
} finally {
current.set(old);
}
}
};
}
}

View File

@@ -33,7 +33,6 @@ import com.google.inject.Singleton;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -76,20 +75,18 @@ public class AllAccountsIndexer extends SiteIndexer<Account.Id, AccountState, Ac
}
private SiteIndexer.Result reindexAccounts(
final AccountIndex index, List<Account.Id> ids, ProgressMonitor progress) {
AccountIndex index, List<Account.Id> ids, ProgressMonitor progress) {
progress.beginTask("Reindexing accounts", ids.size());
List<ListenableFuture<?>> futures = new ArrayList<>(ids.size());
AtomicBoolean ok = new AtomicBoolean(true);
final AtomicInteger done = new AtomicInteger();
final AtomicInteger failed = new AtomicInteger();
AtomicInteger done = new AtomicInteger();
AtomicInteger failed = new AtomicInteger();
Stopwatch sw = Stopwatch.createStarted();
for (final Account.Id id : ids) {
final String desc = "account " + id;
String desc = "account " + id;
ListenableFuture<?> future =
executor.submit(
new Callable<Void>() {
@Override
public Void call() throws Exception {
() -> {
try {
accountCache.evict(id);
index.replace(accountCache.get(id));
@@ -100,7 +97,6 @@ public class AllAccountsIndexer extends SiteIndexer<Account.Id, AccountState, Ac
throw e;
}
return null;
}
});
addErrorListener(future, desc, progress, ok);
futures.add(future);

View File

@@ -32,7 +32,6 @@ import com.google.inject.Singleton;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -79,16 +78,14 @@ public class AllGroupsIndexer extends SiteIndexer<AccountGroup.UUID, AccountGrou
progress.beginTask("Reindexing groups", uuids.size());
List<ListenableFuture<?>> futures = new ArrayList<>(uuids.size());
AtomicBoolean ok = new AtomicBoolean(true);
final AtomicInteger done = new AtomicInteger();
final AtomicInteger failed = new AtomicInteger();
AtomicInteger done = new AtomicInteger();
AtomicInteger failed = new AtomicInteger();
Stopwatch sw = Stopwatch.createStarted();
for (final AccountGroup.UUID uuid : uuids) {
final String desc = "group " + uuid;
String desc = "group " + uuid;
ListenableFuture<?> future =
executor.submit(
new Callable<Void>() {
@Override
public Void call() throws Exception {
() -> {
try {
AccountGroup oldGroup = groupCache.get(uuid);
if (oldGroup != null) {
@@ -102,7 +99,6 @@ public class AllGroupsIndexer extends SiteIndexer<AccountGroup.UUID, AccountGrou
throw e;
}
return null;
}
});
addErrorListener(future, desc, progress, ok);
futures.add(future);

View File

@@ -25,7 +25,6 @@ import com.google.gerrit.server.notedb.NoteDbUpdateManager.Result;
import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.SchemaFactory;
import java.io.IOException;
import java.util.concurrent.Callable;
public abstract class ChangeRebuilder {
public static class NoPatchSetsException extends OrmException {
@@ -45,13 +44,10 @@ public abstract class ChangeRebuilder {
public final ListenableFuture<Result> rebuildAsync(
final Change.Id id, ListeningExecutorService executor) {
return executor.submit(
new Callable<Result>() {
@Override
public Result call() throws Exception {
() -> {
try (ReviewDb db = schemaFactory.open()) {
return rebuild(db, id);
}
}
});
}

View File

@@ -75,12 +75,7 @@ class IntraLineLoader implements Callable<IntraLineDiff> {
public IntraLineDiff call() throws Exception {
Future<IntraLineDiff> result =
diffExecutor.submit(
new Callable<IntraLineDiff>() {
@Override
public IntraLineDiff call() throws Exception {
return IntraLineLoader.compute(args.aText(), args.bText(), args.edits());
}
});
() -> IntraLineLoader.compute(args.aText(), args.bText(), args.edits()));
try {
return result.get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException e) {

View File

@@ -255,13 +255,10 @@ public class PatchListLoader implements Callable<PatchList> {
Future<FileHeader> result =
diffExecutor.submit(
new Callable<FileHeader>() {
@Override
public FileHeader call() throws IOException {
() -> {
synchronized (diffEntry) {
return diffFormatter.toFileHeader(diffEntry);
}
}
});
try {

View File

@@ -173,9 +173,7 @@ public abstract class RequestScopePropagator {
protected abstract <T> Callable<T> wrapImpl(Callable<T> callable);
protected <T> Callable<T> context(final RequestContext context, final Callable<T> callable) {
return new Callable<T>() {
@Override
public T call() throws Exception {
return () -> {
RequestContext old =
local.setContext(
new RequestContext() {
@@ -194,14 +192,11 @@ public abstract class RequestScopePropagator {
} finally {
local.setContext(old);
}
}
};
}
protected <T> Callable<T> cleanup(final Callable<T> callable) {
return new Callable<T>() {
@Override
public T call() throws Exception {
return () -> {
RequestCleanup cleanup =
scope
.scope(
@@ -218,7 +213,6 @@ public abstract class RequestScopePropagator {
} finally {
cleanup.run();
}
}
};
}
}

View File

@@ -42,10 +42,8 @@ public abstract class ThreadLocalRequestScopePropagator<C> extends RequestScopeP
/** @see RequestScopePropagator#wrap(Callable) */
@Override
protected final <T> Callable<T> wrapImpl(final Callable<T> callable) {
final C ctx = continuingContext(requireContext());
return new Callable<T>() {
@Override
public T call() throws Exception {
C ctx = continuingContext(requireContext());
return () -> {
C old = threadLocal.get();
threadLocal.set(ctx);
try {
@@ -57,7 +55,6 @@ public abstract class ThreadLocalRequestScopePropagator<C> extends RequestScopeP
threadLocal.remove();
}
}
}
};
}