Move PushQueue and ReplicationQueue to singletons managed by Guice

Signed-off-by: Shawn O. Pearce <sop@google.com>
This commit is contained in:
Shawn O. Pearce
2009-07-28 11:18:50 -07:00
parent f722b5ca86
commit 20fe732921
16 changed files with 412 additions and 303 deletions

View File

@@ -0,0 +1,149 @@
// Copyright (C) 2009 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.git;
import com.google.gerrit.client.reviewdb.Branch;
import com.google.gerrit.client.reviewdb.Project;
import com.google.gerrit.server.GerritServer;
import com.google.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class ChangeMergeQueue implements MergeQueue {
private static final Logger log =
LoggerFactory.getLogger(ChangeMergeQueue.class);
private final Map<Branch.NameKey, MergeEntry> active =
new HashMap<Branch.NameKey, MergeEntry>();
private final GerritServer server;
private final ReplicationQueue replication;
@Inject
ChangeMergeQueue(final GerritServer gs, final ReplicationQueue rq) {
server = gs;
replication = rq;
}
@Override
public void merge(final Branch.NameKey branch) {
if (start(branch)) {
try {
mergeImpl(branch);
} finally {
finish(branch);
}
}
}
private synchronized boolean start(final Branch.NameKey branch) {
final MergeEntry e = active.get(branch);
if (e == null) {
// Let the caller attempt this merge, its the only one interested
// in processing this branch right now.
//
active.put(branch, new MergeEntry(branch));
return true;
} else {
// Request that the job queue handle this merge later.
//
e.needMerge = true;
return false;
}
}
@Override
public synchronized void schedule(final Branch.NameKey branch) {
MergeEntry e = active.get(branch);
if (e == null) {
e = new MergeEntry(branch);
active.put(branch, e);
}
e.needMerge = true;
scheduleJob(e);
}
private synchronized void finish(final Branch.NameKey branch) {
final MergeEntry e = active.get(branch);
if (e == null) {
// Not registered? Shouldn't happen but ignore it.
//
return;
}
if (!e.needMerge) {
// No additional merges are in progress, we can delete it.
//
active.remove(branch);
return;
}
scheduleJob(e);
}
private void scheduleJob(final MergeEntry e) {
if (!e.jobScheduled) {
// No job has been scheduled to execute this branch, but it needs
// to run a merge again.
//
e.jobScheduled = true;
WorkQueue.schedule(new Runnable() {
public void run() {
unschedule(e);
try {
mergeImpl(e.dest);
} finally {
finish(e.dest);
}
}
@Override
public String toString() {
final Branch.NameKey branch = e.dest;
final Project.NameKey project = branch.getParentKey();
return "submit " + project.get() + " " + branch.getShortName();
}
}, 0, TimeUnit.SECONDS);
}
}
private synchronized void unschedule(final MergeEntry e) {
e.jobScheduled = false;
e.needMerge = false;
}
private void mergeImpl(final Branch.NameKey branch) {
try {
new MergeOp(server, replication, branch).merge();
} catch (Throwable e) {
log.error("Merge attempt for " + branch + " failed", e);
}
}
private static class MergeEntry {
final Branch.NameKey dest;
boolean needMerge;
boolean jobScheduled;
MergeEntry(final Branch.NameKey d) {
dest = d;
}
}
}

View File

@@ -96,6 +96,7 @@ public class MergeOp {
private static final FooterKey REVIEWED_ON = new FooterKey("Reviewed-on");
private final GerritServer server;
private final ReplicationQueue replication;
private final PersonIdent myIdent;
private final Branch.NameKey destBranch;
private Project destProject;
@@ -110,8 +111,10 @@ public class MergeOp {
private CodeReviewCommit mergeTip;
private RefUpdate branchUpdate;
public MergeOp(final GerritServer gs, final Branch.NameKey branch) {
public MergeOp(final GerritServer gs, final ReplicationQueue rq,
final Branch.NameKey branch) {
server = gs;
replication = rq;
myIdent = server.newGerritPersonIdent();
destBranch = branch;
toMerge = new ArrayList<CodeReviewCommit>();
@@ -715,7 +718,7 @@ public class MergeOp {
switch (branchUpdate.update(rw)) {
case NEW:
case FAST_FORWARD:
PushQueue.scheduleUpdate(destBranch.getParentKey(), branchUpdate
replication.scheduleUpdate(destBranch.getParentKey(), branchUpdate
.getName());
break;

View File

@@ -15,121 +15,9 @@
package com.google.gerrit.git;
import com.google.gerrit.client.reviewdb.Branch;
import com.google.gerrit.client.reviewdb.Project;
import com.google.gerrit.server.GerritServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public interface MergeQueue {
void merge(Branch.NameKey branch);
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class MergeQueue {
private static final Logger log = LoggerFactory.getLogger(MergeQueue.class);
private static final Map<Branch.NameKey, MergeEntry> active =
new HashMap<Branch.NameKey, MergeEntry>();
public static void merge(final Branch.NameKey branch) {
if (start(branch)) {
try {
mergeImpl(branch);
} finally {
finish(branch);
}
}
}
public static synchronized boolean start(final Branch.NameKey branch) {
final MergeEntry e = active.get(branch);
if (e == null) {
// Let the caller attempt this merge, its the only one interested
// in processing this branch right now.
//
active.put(branch, new MergeEntry(branch));
return true;
} else {
// Request that the job queue handle this merge later.
//
e.needMerge = true;
return false;
}
}
public static synchronized void schedule(final Branch.NameKey branch) {
MergeEntry e = active.get(branch);
if (e == null) {
e = new MergeEntry(branch);
active.put(branch, e);
}
e.needMerge = true;
scheduleJobImp(e);
}
public static synchronized void finish(final Branch.NameKey branch) {
final MergeEntry e = active.get(branch);
if (e == null) {
// Not registered for a build? Shouldn't happen but ignore it.
//
return;
}
if (!e.needMerge) {
// No additional merges are in progress, we can delete it.
//
active.remove(branch);
return;
}
scheduleJobImp(e);
}
private static void scheduleJobImp(final MergeEntry e) {
if (!e.jobScheduled) {
// No job has been scheduled to execute this branch, but it needs
// to run a merge again.
//
e.jobScheduled = true;
WorkQueue.schedule(new Runnable() {
public void run() {
unschedule(e);
try {
mergeImpl(e.dest);
} finally {
finish(e.dest);
}
}
@Override
public String toString() {
final Branch.NameKey branch = e.dest;
final Project.NameKey project = branch.getParentKey();
return "submit " + project.get() + " " + branch.getShortName();
}
}, 0, TimeUnit.SECONDS);
}
}
private static synchronized void unschedule(final MergeEntry e) {
e.jobScheduled = false;
e.needMerge = false;
}
private static void mergeImpl(final Branch.NameKey branch) {
try {
new MergeOp(GerritServer.getInstance(), branch).merge();
} catch (Throwable e) {
log.error("Merge attempt for " + branch + " failed", e);
}
}
private static class MergeEntry {
final Branch.NameKey dest;
boolean needMerge;
boolean jobScheduled;
MergeEntry(final Branch.NameKey d) {
dest = d;
}
}
void schedule(Branch.NameKey branch);
}

View File

@@ -18,7 +18,6 @@ import com.google.gerrit.client.reviewdb.Branch;
import com.google.gerrit.client.reviewdb.Project;
import com.google.gerrit.client.reviewdb.ProjectRight;
import com.google.gerrit.client.reviewdb.ReviewDb;
import com.google.gerrit.client.rpc.Common;
import com.google.gerrit.server.GerritServer;
import com.google.gwtorm.client.OrmException;
@@ -32,14 +31,17 @@ public class PushAllProjectsOp implements Runnable {
LoggerFactory.getLogger(PushAllProjectsOp.class);
private final GerritServer server;
private final ReplicationQueue replication;
private final String urlMatch;
public PushAllProjectsOp(final GerritServer gs) {
this(gs, null);
public PushAllProjectsOp(final GerritServer gs, final ReplicationQueue rq) {
this(gs, rq, null);
}
public PushAllProjectsOp(final GerritServer gs, final String urlMatch) {
public PushAllProjectsOp(final GerritServer gs, final ReplicationQueue rq,
final String urlMatch) {
this.server = gs;
this.replication = rq;
this.urlMatch = urlMatch;
}
@@ -50,7 +52,7 @@ public class PushAllProjectsOp implements Runnable {
try {
for (final Project project : db.projects().all()) {
if (!ProjectRight.WILD_PROJECT.equals(project.getId())) {
PushQueue.scheduleFullSync(project.getNameKey(), urlMatch);
replication.scheduleFullSync(project.getNameKey(), urlMatch);
}
}
} finally {

View File

@@ -15,6 +15,7 @@
package com.google.gerrit.git;
import com.google.gerrit.server.GerritServer;
import com.google.inject.Inject;
import com.jcraft.jsch.JSchException;
@@ -43,18 +44,20 @@ import java.util.Map;
import java.util.Set;
/**
* A push to remote operation started by {@link PushQueue}.
* A push to remote operation started by {@link ReplicationQueue}.
* <p>
* Instance members are protected by the lock within PushQueue. Callers must
* take that lock to ensure they are working with a current view of the object.
*/
class PushOp implements Runnable {
private static final Logger log = PushQueue.log;
private static final Logger log = PushReplication.log;
static final String MIRROR_ALL = "..all..";
private final GerritServer server;
@Inject
private GerritServer server;
private final Set<String> delta = new HashSet<String>();
private final PushQueue.ReplicationConfig pool;
private final PushReplication.ReplicationConfig pool;
private final String projectName;
private final RemoteConfig config;
private final URIish uri;
@@ -62,9 +65,8 @@ class PushOp implements Runnable {
private Repository db;
PushOp(final GerritServer gs, final PushQueue.ReplicationConfig p,
final String d, final RemoteConfig c, final URIish u) {
server = gs;
PushOp(final PushReplication.ReplicationConfig p, final String d,
final RemoteConfig c, final URIish u) {
pool = p;
projectName = d;
config = c;

View File

@@ -16,8 +16,8 @@ package com.google.gerrit.git;
import com.google.gerrit.client.reviewdb.Project;
import com.google.gerrit.server.GerritServer;
import com.google.gwtjsonrpc.server.XsrfException;
import com.google.gwtorm.client.OrmException;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.jcraft.jsch.Session;
@@ -44,9 +44,8 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
/** Manages automatic replication to remote repositories. */
public class PushQueue {
static final Logger log = LoggerFactory.getLogger(PushQueue.class);
private static List<ReplicationConfig> configs;
public class PushReplication implements ReplicationQueue {
static final Logger log = LoggerFactory.getLogger(PushReplication.class);
static {
// Install our own factory which always runs in batch mode, as we
@@ -60,44 +59,35 @@ public class PushQueue {
});
}
/** Determine if replication is enabled, or not. */
public static boolean isReplicationEnabled() {
return !allConfigs().isEmpty();
private final Injector injector;
private final GerritServer server;
private final List<ReplicationConfig> configs;
@Inject
PushReplication(final Injector i, final GerritServer gs) {
injector = i;
server = gs;
configs = allConfigs();
}
/**
* Schedule a full replication for a single project.
* <p>
* All remote URLs are checked to verify the are current with regards to the
* local project state. If not, they are updated by pushing new refs, updating
* existing ones which don't match, and deleting stale refs which have been
* removed from the local repository.
*
* @param project identity of the project to replicate.
* @param urlMatch substring that must appear in a URI to support replication.
*/
public static void scheduleFullSync(final Project.NameKey project,
@Override
public boolean isEnabled() {
return configs.size() > 0;
}
@Override
public void scheduleFullSync(final Project.NameKey project,
final String urlMatch) {
for (final ReplicationConfig cfg : allConfigs()) {
for (final ReplicationConfig cfg : configs) {
for (final URIish uri : cfg.getURIs(project, urlMatch)) {
cfg.schedule(project, PushOp.MIRROR_ALL, uri);
}
}
}
/**
* Schedule update of a single ref.
* <p>
* This method automatically tries to batch together multiple requests in the
* same project, to take advantage of Git's native ability to update multiple
* refs during a single push operation.
*
* @param project identity of the project to replicate.
* @param ref unique name of the ref; must start with {@code refs/}.
*/
public static void scheduleUpdate(final Project.NameKey project,
final String ref) {
for (final ReplicationConfig cfg : allConfigs()) {
@Override
public void scheduleUpdate(final Project.NameKey project, final String ref) {
for (final ReplicationConfig cfg : configs) {
if (cfg.wouldPushRef(ref)) {
for (final URIish uri : cfg.getURIs(project, null)) {
cfg.schedule(project, ref, uri);
@@ -112,77 +102,62 @@ public class PushQueue {
return pat.substring(0, n) + val + pat.substring(n + 3 + key.length());
}
private static synchronized List<ReplicationConfig> allConfigs() {
if (configs == null) {
final GerritServer gs;
try {
gs = GerritServer.getInstance();
} catch (OrmException e) {
return Collections.emptyList();
} catch (XsrfException e) {
return Collections.emptyList();
}
final File path = gs.getSitePath();
if (path == null) {
return Collections.emptyList();
}
final File cfgFile = new File(path, "replication.config");
final RepositoryConfig cfg = new RepositoryConfig(null, cfgFile);
try {
cfg.load();
final List<ReplicationConfig> r = new ArrayList<ReplicationConfig>();
for (final RemoteConfig c : RemoteConfig.getAllRemoteConfigs(cfg)) {
if (c.getURIs().isEmpty()) {
continue;
}
for (final URIish u : c.getURIs()) {
if (u.getPath() == null || !u.getPath().contains("${name}")) {
final String s = u.toString();
throw new URISyntaxException(s, "No ${name}");
}
}
if (c.getPushRefSpecs().isEmpty()) {
RefSpec spec = new RefSpec();
spec = spec.setSourceDestination("refs/*", "refs/*");
spec = spec.setForceUpdate(true);
c.addPushRefSpec(spec);
}
r.add(new ReplicationConfig(gs, c, cfg));
}
configs = Collections.unmodifiableList(r);
} catch (FileNotFoundException e) {
log.warn("No " + cfgFile + "; not replicating");
configs = Collections.emptyList();
} catch (ConfigInvalidException e) {
log.error("Can't read " + cfgFile, e);
return Collections.emptyList();
} catch (IOException e) {
log.error("Can't read " + cfgFile, e);
return Collections.emptyList();
} catch (URISyntaxException e) {
log.error("Invalid URI in " + cfgFile + ": " + e.getMessage());
return Collections.emptyList();
}
private synchronized List<ReplicationConfig> allConfigs() {
final File path = server.getSitePath();
if (path == null) {
return Collections.emptyList();
}
final File cfgFile = new File(path, "replication.config");
final RepositoryConfig cfg = new RepositoryConfig(null, cfgFile);
try {
cfg.load();
final List<ReplicationConfig> r = new ArrayList<ReplicationConfig>();
for (final RemoteConfig c : RemoteConfig.getAllRemoteConfigs(cfg)) {
if (c.getURIs().isEmpty()) {
continue;
}
for (final URIish u : c.getURIs()) {
if (u.getPath() == null || !u.getPath().contains("${name}")) {
final String s = u.toString();
throw new URISyntaxException(s, "No ${name}");
}
}
if (c.getPushRefSpecs().isEmpty()) {
RefSpec spec = new RefSpec();
spec = spec.setSourceDestination("refs/*", "refs/*");
spec = spec.setForceUpdate(true);
c.addPushRefSpec(spec);
}
r.add(new ReplicationConfig(c, cfg));
}
return Collections.unmodifiableList(r);
} catch (FileNotFoundException e) {
log.warn("No " + cfgFile + "; not replicating");
return Collections.emptyList();
} catch (ConfigInvalidException e) {
log.error("Can't read " + cfgFile, e);
return Collections.emptyList();
} catch (IOException e) {
log.error("Can't read " + cfgFile, e);
return Collections.emptyList();
} catch (URISyntaxException e) {
log.error("Invalid URI in " + cfgFile + ": " + e.getMessage());
return Collections.emptyList();
}
return configs;
}
static class ReplicationConfig {
private final GerritServer server;
class ReplicationConfig {
private final RemoteConfig remote;
private final int delay;
private final WorkQueue.Executor pool;
private final Map<URIish, PushOp> pending = new HashMap<URIish, PushOp>();
ReplicationConfig(final GerritServer gs, final RemoteConfig rc,
final RepositoryConfig cfg) {
server = gs;
ReplicationConfig(final RemoteConfig rc, final RepositoryConfig cfg) {
remote = rc;
delay = Math.max(0, getInt(rc, cfg, "replicationdelay", 15));
@@ -191,8 +166,8 @@ public class PushQueue {
pool = WorkQueue.createQueue(poolSize, poolName);
}
private static int getInt(final RemoteConfig rc,
final RepositoryConfig cfg, final String name, final int defValue) {
private int getInt(final RemoteConfig rc, final RepositoryConfig cfg,
final String name, final int defValue) {
return cfg.getInt("remote", rc.getName(), name, defValue);
}
@@ -201,7 +176,8 @@ public class PushQueue {
synchronized (pending) {
PushOp e = pending.get(uri);
if (e == null) {
e = new PushOp(server, this, project.get(), remote, uri);
e = new PushOp(this, project.get(), remote, uri);
injector.injectMembers(e);
pool.schedule(e, delay, TimeUnit.SECONDS);
pending.put(uri, e);
}
@@ -235,7 +211,7 @@ public class PushQueue {
return r;
}
private static boolean matches(URIish uri, final String urlMatch) {
private boolean matches(URIish uri, final String urlMatch) {
if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
return true;
}

View File

@@ -0,0 +1,48 @@
// Copyright (C) 2009 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.git;
import com.google.gerrit.client.reviewdb.Project;
/** Manages replication to other nodes. */
public interface ReplicationQueue {
/** Is replication to one or more other destinations configured? */
boolean isEnabled();
/**
* Schedule a full replication for a single project.
* <p>
* All remote URLs are checked to verify the are current with regards to the
* local project state. If not, they are updated by pushing new refs, updating
* existing ones which don't match, and deleting stale refs which have been
* removed from the local repository.
*
* @param project identity of the project to replicate.
* @param urlMatch substring that must appear in a URI to support replication.
*/
void scheduleFullSync(Project.NameKey project, String urlMatch);
/**
* Schedule update of a single ref.
* <p>
* This method automatically tries to batch together multiple requests in the
* same project, to take advantage of Git's native ability to update multiple
* refs during a single push operation.
*
* @param project identity of the project to replicate.
* @param ref unique name of the ref; must start with {@code refs/}.
*/
void scheduleUpdate(Project.NameKey project, String ref);
}

View File

@@ -22,7 +22,7 @@ import com.google.gerrit.server.GerritServer;
public class CreateSchema extends AbstractProgram {
@Override
public int run() throws Exception {
final GerritServer gs = GerritServer.getInstance(false);
final GerritServer gs = GerritServer.getInstance();
gs.getSchemaFactory().open().close();
System.out.println("Gerrit2 schema initialized");
return 0;

View File

@@ -52,7 +52,7 @@ import java.util.ArrayList;
public class ReimportPatchSets extends AbstractProgram {
@Override
public int run() throws Exception {
final GerritServer gs = GerritServer.getInstance(false);
final GerritServer gs = GerritServer.getInstance();
final ArrayList<PatchSet.Id> todo = new ArrayList<PatchSet.Id>();
final BufferedReader br =
new BufferedReader(new InputStreamReader(System.in));

View File

@@ -40,9 +40,12 @@ import java.util.List;
class ChangeManageServiceImpl extends BaseServiceImplementation implements
ChangeManageService {
private final MergeQueue merger;
@Inject
ChangeManageServiceImpl(final GerritServer gs) {
ChangeManageServiceImpl(final GerritServer gs, final MergeQueue mq) {
super(gs);
merger = mq;
}
public void patchSetAction(final ApprovalCategoryValue.Id value,
@@ -135,7 +138,7 @@ class ChangeManageServiceImpl extends BaseServiceImplementation implements
txn.commit();
if (change.getStatus() == Change.Status.SUBMITTED) {
MergeQueue.merge(change.getDest());
merger.merge(change.getDest());
}
return VoidResult.INSTANCE;

View File

@@ -20,8 +20,6 @@ import com.google.gerrit.client.data.ProjectCache;
import com.google.gerrit.client.reviewdb.AccountGroup;
import com.google.gerrit.client.reviewdb.ApprovalCategory;
import com.google.gerrit.client.reviewdb.ApprovalCategoryValue;
import com.google.gerrit.client.reviewdb.Branch;
import com.google.gerrit.client.reviewdb.Change;
import com.google.gerrit.client.reviewdb.Project;
import com.google.gerrit.client.reviewdb.ProjectRight;
import com.google.gerrit.client.reviewdb.ReviewDb;
@@ -30,10 +28,6 @@ import com.google.gerrit.client.reviewdb.SystemConfig;
import com.google.gerrit.client.reviewdb.TrustedExternalId;
import com.google.gerrit.client.reviewdb.SystemConfig.LoginType;
import com.google.gerrit.client.rpc.Common;
import com.google.gerrit.git.MergeQueue;
import com.google.gerrit.git.PushAllProjectsOp;
import com.google.gerrit.git.PushQueue;
import com.google.gerrit.git.WorkQueue;
import com.google.gerrit.server.mail.EmailException;
import com.google.gerrit.server.patch.DiffCacheEntryFactory;
import com.google.gerrit.server.ssh.SshKeyCacheEntryFactory;
@@ -82,11 +76,9 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import javax.naming.InitialContext;
import javax.naming.NamingException;
@@ -139,20 +131,9 @@ public class GerritServer {
*/
public static synchronized GerritServer getInstance() throws OrmException,
XsrfException {
return getInstance(true);
}
public static synchronized GerritServer getInstance(final boolean startQueues)
throws OrmException, XsrfException {
if (impl == null) {
try {
impl = new GerritServer();
if (startQueues) {
impl.reloadSubmitQueue();
if (PushQueue.isReplicationEnabled()) {
WorkQueue.schedule(new PushAllProjectsOp(impl), 30, TimeUnit.SECONDS);
}
}
} catch (OrmException e) {
closeDataSource();
log.error("GerritServer ORM is unavailable", e);
@@ -744,35 +725,6 @@ public class GerritServer {
WindowCache.reconfigure(c);
}
private void reloadSubmitQueue() {
WorkQueue.schedule(new Runnable() {
public void run() {
final HashSet<Branch.NameKey> pending = new HashSet<Branch.NameKey>();
try {
final ReviewDb c = db.open();
try {
for (final Change change : c.changes().allSubmitted()) {
pending.add(change.getDest());
}
} finally {
c.close();
}
} catch (OrmException e) {
log.error("Cannot reload MergeQueue", e);
}
for (final Branch.NameKey branch : pending) {
MergeQueue.schedule(branch);
}
}
@Override
public String toString() {
return "Reload Submit Queue";
}
}, 15, TimeUnit.SECONDS);
}
/** Time (in seconds) that user sessions stay "signed in". */
public int getSessionAge() {
return sessionAge;

View File

@@ -15,17 +15,21 @@
package com.google.gerrit.server;
import com.google.gerrit.client.data.GerritConfig;
import com.google.gerrit.git.ChangeMergeQueue;
import com.google.gerrit.git.MergeQueue;
import com.google.gerrit.git.PushReplication;
import com.google.gerrit.git.ReplicationQueue;
import com.google.gwtjsonrpc.server.XsrfException;
import com.google.gwtorm.client.OrmException;
import com.google.inject.AbstractModule;
import com.google.inject.Scopes;
import static com.google.inject.Scopes.SINGLETON;
/** Starts {@link GerritServer} with standard dependencies. */
public class GerritServerModule extends AbstractModule {
@Override
protected void configure() {
try {
bind(GerritServer.class).toInstance(GerritServer.getInstance(true));
bind(GerritServer.class).toInstance(GerritServer.getInstance());
} catch (OrmException e) {
addError(e);
} catch (XsrfException e) {
@@ -34,7 +38,9 @@ public class GerritServerModule extends AbstractModule {
bind(ContactStore.class).toProvider(EncryptedContactStoreProvider.class);
bind(FileTypeRegistry.class).to(MimeUtilFileTypeRegistry.class);
bind(ReplicationQueue.class).to(PushReplication.class).in(SINGLETON);
bind(MergeQueue.class).to(ChangeMergeQueue.class).in(SINGLETON);
bind(GerritConfig.class).toProvider(GerritConfigProvider.class).in(
Scopes.SINGLETON);
SINGLETON);
}
}

View File

@@ -14,6 +14,12 @@
package com.google.gerrit.server;
import com.google.gerrit.client.reviewdb.Branch;
import com.google.gerrit.client.reviewdb.Change;
import com.google.gerrit.client.reviewdb.ReviewDb;
import com.google.gerrit.git.MergeQueue;
import com.google.gerrit.git.PushAllProjectsOp;
import com.google.gerrit.git.ReplicationQueue;
import com.google.gerrit.git.WorkQueue;
import com.google.gerrit.server.patch.PatchDetailServiceImpl;
import com.google.gerrit.server.ssh.GerritSshDaemon;
@@ -21,6 +27,7 @@ import com.google.gerrit.server.ssh.SshDaemonModule;
import com.google.gerrit.server.ssh.SshServlet;
import com.google.gwtexpui.server.CacheControlFilter;
import com.google.gwtjsonrpc.client.RemoteJsonService;
import com.google.gwtorm.client.OrmException;
import com.google.inject.BindingAnnotation;
import com.google.inject.ConfigurationException;
import com.google.inject.Guice;
@@ -31,16 +38,24 @@ import com.google.inject.Scopes;
import com.google.inject.servlet.GuiceServletContextListener;
import com.google.inject.servlet.ServletModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.security.ProviderException;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletContextEvent;
/** Configures the web application environment for Gerrit Code Review. */
public class GerritServletConfig extends GuiceServletContextListener {
private static final Logger log =
LoggerFactory.getLogger(GerritServletConfig.class);
@Retention(RetentionPolicy.RUNTIME)
@BindingAnnotation
private static @interface ServletName {
@@ -124,8 +139,7 @@ public class GerritServletConfig extends GuiceServletContextListener {
}
private final Injector injector =
Guice.createInjector(createServletModule(),
new GerritServerModule(),
Guice.createInjector(createServletModule(), new GerritServerModule(),
new SshDaemonModule());
@Override
@@ -137,17 +151,72 @@ public class GerritServletConfig extends GuiceServletContextListener {
public void contextInitialized(final ServletContextEvent event) {
super.contextInitialized(event);
try {
startReplication();
} catch (ConfigurationException e) {
log.error("Unable to restart replication queue", e);
} catch (ProviderException e) {
log.error("Unable to restart replication queue", e);
}
try {
restartPendingMerges();
} catch (ConfigurationException e) {
log.error("Unable to restart merge queue", e);
} catch (ProviderException e) {
log.error("Unable to restart merge queue", e);
}
try {
injector.getInstance(GerritSshDaemon.class).start();
} catch (ConfigurationException e) {
event.getServletContext().log("Unable to start SSHD", e);
log.error("Unable to start SSHD", e);
} catch (ProviderException e) {
event.getServletContext().log("Unable to start SSHD", e);
log.error("Unable to start SSHD", e);
} catch (IOException e) {
event.getServletContext().log("Unable to start SSHD", e);
log.error("Unable to start SSHD", e);
}
}
private void startReplication() {
final ReplicationQueue rq = injector.getInstance(ReplicationQueue.class);
if (rq.isEnabled()) {
final GerritServer gs = injector.getInstance(GerritServer.class);
WorkQueue.schedule(new PushAllProjectsOp(gs, rq), 30, TimeUnit.SECONDS);
}
}
private void restartPendingMerges() {
final MergeQueue mq = injector.getInstance(MergeQueue.class);
final GerritServer gs = injector.getInstance(GerritServer.class);
WorkQueue.schedule(new Runnable() {
public void run() {
final HashSet<Branch.NameKey> pending = new HashSet<Branch.NameKey>();
try {
final ReviewDb c = gs.getSchemaFactory().open();
try {
for (final Change change : c.changes().allSubmitted()) {
pending.add(change.getDest());
}
} finally {
c.close();
}
} catch (OrmException e) {
log.error("Cannot reload MergeQueue", e);
}
for (final Branch.NameKey branch : pending) {
mq.schedule(branch);
}
}
@Override
public String toString() {
return "Reload Submit Queue";
}
}, 15, TimeUnit.SECONDS);
}
@Override
public void contextDestroyed(final ServletContextEvent event) {
try {

View File

@@ -30,7 +30,7 @@ import com.google.gerrit.client.rpc.Common;
import com.google.gerrit.client.rpc.InvalidNameException;
import com.google.gerrit.client.rpc.InvalidRevisionException;
import com.google.gerrit.client.rpc.NoSuchEntityException;
import com.google.gerrit.git.PushQueue;
import com.google.gerrit.git.ReplicationQueue;
import com.google.gwt.user.client.rpc.AsyncCallback;
import com.google.gwtjsonrpc.client.VoidResult;
import com.google.gwtorm.client.OrmException;
@@ -67,9 +67,12 @@ class ProjectAdminServiceImpl extends BaseServiceImplementation implements
ProjectAdminService {
private final Logger log = LoggerFactory.getLogger(getClass());
private final ReplicationQueue replication;
@Inject
ProjectAdminServiceImpl(final GerritServer gs) {
ProjectAdminServiceImpl(final GerritServer gs, final ReplicationQueue rq) {
super(gs);
replication = rq;
}
public void ownedProjects(final AsyncCallback<List<Project>> callback) {
@@ -313,7 +316,7 @@ class ProjectAdminServiceImpl extends BaseServiceImplementation implements
case FORCED:
db.branches().delete(Collections.singleton(m));
deleted.add(mKey);
PushQueue.scheduleUpdate(mKey.getParentKey(), m.getName());
replication.scheduleUpdate(mKey.getParentKey(), m.getName());
break;
case REJECTED_CURRENT_BRANCH:
@@ -423,7 +426,7 @@ class ProjectAdminServiceImpl extends BaseServiceImplementation implements
case FAST_FORWARD:
case NEW:
case NO_CHANGE:
PushQueue.scheduleUpdate(name.getParentKey(), refname);
replication.scheduleUpdate(name.getParentKey(), refname);
break;
default: {
final String msg =

View File

@@ -17,8 +17,9 @@ package com.google.gerrit.server.ssh;
import com.google.gerrit.client.reviewdb.Project;
import com.google.gerrit.client.rpc.Common;
import com.google.gerrit.git.PushAllProjectsOp;
import com.google.gerrit.git.PushQueue;
import com.google.gerrit.git.ReplicationQueue;
import com.google.gerrit.git.WorkQueue;
import com.google.inject.Inject;
import org.kohsuke.args4j.Argument;
import org.kohsuke.args4j.Option;
@@ -38,6 +39,9 @@ class AdminReplicate extends AbstractCommand {
@Argument(index = 0, multiValued = true, metaVar = "PROJECT", usage = "project name")
private List<String> projectNames = new ArrayList<String>(2);
@Inject
private ReplicationQueue replication;
@Override
protected void run() throws Failure {
assertIsAdministrator();
@@ -46,19 +50,19 @@ class AdminReplicate extends AbstractCommand {
throw new Failure(1, "error: cannot combine --all and PROJECT");
}
if (!PushQueue.isReplicationEnabled()) {
if (!replication.isEnabled()) {
throw new Failure(1, "error: replication not enabled");
}
if (all) {
WorkQueue.schedule(new PushAllProjectsOp(getGerritServer(), urlMatch), 0,
TimeUnit.SECONDS);
WorkQueue.schedule(new PushAllProjectsOp(server, replication, urlMatch),
0, TimeUnit.SECONDS);
} else {
for (final String name : projectNames) {
final Project.NameKey key = new Project.NameKey(name);
if (Common.getProjectCache().get(key) != null) {
PushQueue.scheduleFullSync(key, urlMatch);
replication.scheduleFullSync(key, urlMatch);
} else {
throw new Failure(1, "error: '" + name + "': not a Gerrit project");
}

View File

@@ -41,7 +41,7 @@ import com.google.gerrit.client.reviewdb.PatchSetInfo;
import com.google.gerrit.client.reviewdb.ReviewDb;
import com.google.gerrit.client.rpc.Common;
import com.google.gerrit.git.PatchSetImporter;
import com.google.gerrit.git.PushQueue;
import com.google.gerrit.git.ReplicationQueue;
import com.google.gerrit.server.ChangeUtil;
import com.google.gerrit.server.mail.CreateChangeSender;
import com.google.gerrit.server.mail.EmailException;
@@ -50,6 +50,7 @@ import com.google.gerrit.server.mail.ReplacePatchSetSender;
import com.google.gwtorm.client.OrmException;
import com.google.gwtorm.client.OrmRunnable;
import com.google.gwtorm.client.Transaction;
import com.google.inject.Inject;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.Option;
@@ -127,6 +128,9 @@ class Receive extends AbstractGitCommand {
}
}
@Inject
private ReplicationQueue replication;
private ReceivePack rp;
private PersonIdent refLogIdent;
private ReceiveCommand newChange;
@@ -192,7 +196,7 @@ class Receive extends AbstractGitCommand {
// We only schedule heads and tags for replication.
// Change refs are scheduled when they are created.
//
PushQueue.scheduleUpdate(proj.getNameKey(), c.getRefName());
replication.scheduleUpdate(proj.getNameKey(), c.getRefName());
}
}
}
@@ -755,7 +759,7 @@ class Receive extends AbstractGitCommand {
throw new IOException("Failed to create ref " + ps.getRefName() + " in "
+ repo.getDirectory() + ": " + ru.getResult());
}
PushQueue.scheduleUpdate(proj.getNameKey(), ru.getName());
replication.scheduleUpdate(proj.getNameKey(), ru.getName());
allNewChanges.add(change.getId());
@@ -1015,7 +1019,7 @@ class Receive extends AbstractGitCommand {
throw new IOException("Failed to create ref " + ps.getRefName()
+ " in " + repo.getDirectory() + ": " + ru.getResult());
}
PushQueue.scheduleUpdate(proj.getNameKey(), ru.getName());
replication.scheduleUpdate(proj.getNameKey(), ru.getName());
cmd.setResult(ReceiveCommand.Result.OK);
try {