From d08b045b7a1953f562dcaf9ce456b526f35e791c Mon Sep 17 00:00:00 2001 From: Dave Borowitz Date: Thu, 13 Feb 2014 11:56:03 -0800 Subject: [PATCH] Support committing Lucene writes within a fixed interval The ramBufferSize and maxBufferedDocs options control how often the writer is flushed, but this does not fsync files on disk and thus might not be permanent, particularly in a machine under heavy load. Add an option commitWithin to bound the time between calls to commit. Give each sub-index its own executor to flush and commit outstanding changes. Non-positive periods trigger commit on every write. Change-Id: Idfaf17a1eeb9240f25ee1fc842bdf4f8c9b873c3 --- Documentation/config-gerrit.txt | 14 ++ .../gerrit/lucene/AutoCommitWriter.java | 150 ++++++++++++++++++ .../gerrit/lucene/LuceneChangeIndex.java | 46 ++++-- .../com/google/gerrit/lucene/SubIndex.java | 57 ++++++- 4 files changed, 247 insertions(+), 20 deletions(-) create mode 100644 gerrit-lucene/src/main/java/com/google/gerrit/lucene/AutoCommitWriter.java diff --git a/Documentation/config-gerrit.txt b/Documentation/config-gerrit.txt index 733a9eb67a..66f9d82cfd 100644 --- a/Documentation/config-gerrit.txt +++ b/Documentation/config-gerrit.txt @@ -1880,6 +1880,20 @@ Lucene documentation] for further details. Defaults to -1, meaning no maximum is set and the writer will flush according to RAM usage. +[[index.name.commitWithin]]index.name.commitWithin:: ++ +Only used when the type is `LUCENE`. ++ +Determines the period at which changes are automatically committed to +stable store on disk. This is a costly operation and may block +additional index writes, so lower with caution. ++ +If zero or negative, changes are committed after every write. This is +very costly but may be useful if offline reindexing is infeasible, or +for development servers. ++ +Defaults to 300000 ms (5 minutes). + Sample index configuration: ---- [index] diff --git a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/AutoCommitWriter.java b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/AutoCommitWriter.java new file mode 100644 index 0000000000..fd14cd9c06 --- /dev/null +++ b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/AutoCommitWriter.java @@ -0,0 +1,150 @@ +// Copyright (C) 2014 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.gerrit.lucene; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.IndexableField; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.Query; +import org.apache.lucene.store.Directory; + +import java.io.IOException; + +/** Writer that optionally flushes/commits after every write. */ +class AutoCommitWriter extends IndexWriter { + private boolean autoCommit; + + AutoCommitWriter(Directory dir, IndexWriterConfig config, boolean autoCommit) + throws IOException { + super(dir, config); + this.autoCommit = autoCommit; + } + + @Override + public void addDocument(Iterable doc) + throws IOException { + super.addDocument(doc); + autoFlush(); + } + + @Override + public void addDocument(Iterable doc, + Analyzer analyzer) throws IOException { + super.addDocument(doc, analyzer); + autoFlush(); + } + + @Override + public void addDocuments( + Iterable> docs) + throws IOException { + super.addDocuments(docs); + autoFlush(); + } + + @Override + public void addDocuments( + Iterable> docs, + Analyzer analyzer) throws IOException { + super.addDocuments(docs, analyzer); + autoFlush(); + } + + @Override + public void updateDocuments(Term delTerm, + Iterable> docs) + throws IOException { + super.updateDocuments(delTerm, docs); + autoFlush(); + } + + @Override + public void updateDocuments(Term delTerm, + Iterable> docs, + Analyzer analyzer) throws IOException { + super.updateDocuments(delTerm, docs, analyzer); + autoFlush(); + } + + @Override + public void deleteDocuments(Term term) throws IOException { + super.deleteDocuments(term); + autoFlush(); + } + + @Override + public synchronized boolean tryDeleteDocument(IndexReader readerIn, int docID) + throws IOException { + boolean ret = super.tryDeleteDocument(readerIn, docID); + if (ret) { + autoFlush(); + } + return ret; + } + + @Override + public void deleteDocuments(Term... terms) throws IOException { + super.deleteDocuments(terms); + autoFlush(); + } + + @Override + public void deleteDocuments(Query query) throws IOException { + super.deleteDocuments(query); + autoFlush(); + } + + @Override + public void deleteDocuments(Query... queries) throws IOException { + super.deleteDocuments(queries); + autoFlush(); + } + + @Override + public void updateDocument(Term term, Iterable doc) + throws IOException { + super.updateDocument(term, doc); + autoFlush(); + } + + @Override + public void updateDocument(Term term, Iterable doc, + Analyzer analyzer) throws IOException { + super.updateDocument(term, doc, analyzer); + autoFlush(); + } + + @Override + public void deleteAll() throws IOException { + super.deleteAll(); + autoFlush(); + } + + void manualFlush() throws IOException { + flush(true, true); + if (autoCommit) { + commit(); + } + } + + private void autoFlush() throws IOException { + if (autoCommit) { + manualFlush(); + } + } +} diff --git a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java index 2748963d72..5ce591fffb 100644 --- a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java +++ b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java @@ -17,6 +17,8 @@ package com.google.gerrit.lucene; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.gerrit.server.index.IndexRewriteImpl.CLOSED_STATUSES; import static com.google.gerrit.server.index.IndexRewriteImpl.OPEN_STATUSES; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -28,6 +30,8 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.gerrit.common.Nullable; import com.google.gerrit.reviewdb.client.Change; import com.google.gerrit.reviewdb.client.PatchSetApproval; + +import com.google.gerrit.server.config.ConfigUtil; import com.google.gerrit.server.config.GerritServerConfig; import com.google.gerrit.server.config.SitePaths; import com.google.gerrit.server.index.ChangeField; @@ -131,17 +135,33 @@ public class LuceneChangeIndex implements ChangeIndex { LuceneChangeIndex create(Schema schema, String base); } - private static IndexWriterConfig getIndexWriterConfig(Version version, - Config cfg, String name) { - IndexWriterConfig writerConfig = new IndexWriterConfig(version, - new StandardAnalyzer(version, CharArraySet.EMPTY_SET)); - writerConfig.setOpenMode(OpenMode.CREATE_OR_APPEND); - double m = 1 << 20; - writerConfig.setRAMBufferSizeMB(cfg.getLong("index", name, "ramBufferSize", + static class GerritIndexWriterConfig { + private final IndexWriterConfig luceneConfig; + private final long commitWithinMs; + + private GerritIndexWriterConfig(Version version, Config cfg, String name) { + luceneConfig = new IndexWriterConfig(version, + new StandardAnalyzer(version, CharArraySet.EMPTY_SET)); + luceneConfig.setOpenMode(OpenMode.CREATE_OR_APPEND); + double m = 1 << 20; + luceneConfig.setRAMBufferSizeMB(cfg.getLong( + "index", name, "ramBufferSize", (long) (IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB * m)) / m); - writerConfig.setMaxBufferedDocs(cfg.getInt("index", name, "maxBufferedDocs", + luceneConfig.setMaxBufferedDocs(cfg.getInt( + "index", name, "maxBufferedDocs", IndexWriterConfig.DEFAULT_MAX_BUFFERED_DOCS)); - return writerConfig; + commitWithinMs = ConfigUtil.getTimeUnit( + cfg, "index", name, "commitWithin", + MILLISECONDS.convert(5, MINUTES), MILLISECONDS); + } + + IndexWriterConfig getLuceneConfig() { + return luceneConfig; + } + + long getCommitWithinMs() { + return commitWithinMs; + } } private final SitePaths sitePaths; @@ -174,10 +194,10 @@ public class LuceneChangeIndex implements ChangeIndex { LUCENE_VERSIONS.get(schema), "unknown Lucene version for index schema: %s", schema); - IndexWriterConfig openConfig = - getIndexWriterConfig(luceneVersion, cfg, "changes_open"); - IndexWriterConfig closedConfig = - getIndexWriterConfig(luceneVersion, cfg, "changes_closed"); + GerritIndexWriterConfig openConfig = + new GerritIndexWriterConfig(luceneVersion, cfg, "changes_open"); + GerritIndexWriterConfig closedConfig = + new GerritIndexWriterConfig(luceneVersion, cfg, "changes_closed"); if (cfg.getBoolean("index", "lucene", "testInmemory", false)) { openIndex = new SubIndex(new RAMDirectory(), "ramOpen", openConfig); closedIndex = new SubIndex(new RAMDirectory(), "ramClosed", closedConfig); diff --git a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java index 655581d68c..34eb2de88a 100644 --- a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java +++ b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java @@ -14,13 +14,15 @@ package com.google.gerrit.lucene; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + import com.google.common.collect.Maps; import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.gerrit.lucene.LuceneChangeIndex.GerritIndexWriterConfig; import org.apache.lucene.document.Document; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.Term; import org.apache.lucene.index.TrackingIndexWriter; import org.apache.lucene.search.ControlledRealTimeReopenThread; @@ -38,6 +40,8 @@ import java.io.IOException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -51,15 +55,54 @@ class SubIndex { private final SearcherManager searcherManager; private final ControlledRealTimeReopenThread reopenThread; private final ConcurrentMap refreshListeners; + private final ScheduledExecutorService commitExecutor; - SubIndex(File file, IndexWriterConfig writerConfig) throws IOException { + SubIndex(File file, GerritIndexWriterConfig writerConfig) throws IOException { this(FSDirectory.open(file), file.getName(), writerConfig); } - SubIndex(Directory dir, String dirName, IndexWriterConfig writerConfig) - throws IOException { + SubIndex(Directory dir, final String dirName, + GerritIndexWriterConfig writerConfig) throws IOException { this.dir = dir; - writer = new TrackingIndexWriter(new IndexWriter(dir, writerConfig)); + + final AutoCommitWriter delegateWriter; + long commitPeriod = writerConfig.getCommitWithinMs(); + if (commitPeriod <= 0) { + commitExecutor = null; + delegateWriter = + new AutoCommitWriter(dir, writerConfig.getLuceneConfig(), true); + } else { + commitExecutor = new ScheduledThreadPoolExecutor(1, + new ThreadFactoryBuilder() + .setNameFormat("Commit-%d " + dirName) + .setDaemon(true) + .build()); + delegateWriter = + new AutoCommitWriter(dir, writerConfig.getLuceneConfig(), false); + commitExecutor.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + if (delegateWriter.hasUncommittedChanges()) { + delegateWriter.manualFlush(); + delegateWriter.commit(); + } + } catch (IOException e) { + log.error("Error committing Lucene index " + dirName, e); + } catch (OutOfMemoryError e) { + log.error("Error committing Lucene index " + dirName, e); + try { + delegateWriter.close(); + } catch (IOException e2) { + log.error("SEVERE: Error closing Lucene index " + + dirName + " after OOM; index may be corrupted.", e); + } + } + } + }, commitPeriod, commitPeriod, MILLISECONDS); + } + + writer = new TrackingIndexWriter(delegateWriter); searcherManager = new SearcherManager( writer.getIndexWriter(), true, new SearcherFactory()); @@ -151,7 +194,7 @@ class SubIndex { TimeoutException, ExecutionException { if (!isDone()) { reopenThread.waitForGeneration(gen, - (int) TimeUnit.MILLISECONDS.convert(timeout, unit)); + (int) MILLISECONDS.convert(timeout, unit)); set(null); } return super.get(timeout, unit);