Merge branch 'stable-2.15'

* stable-2.15:
  ElasticIndexVersionDiscovery: Convert to Java stream API
  ElasticVersionManager: Use correct configuration value for prefix
  Elasticsearch: Refactor generation of bulk requests
  Rename AbstractReindexIT to AbstractReindexTests
  ElasticReindexIT: Use @ConfigSuite.Default instead of @ConfigSuite.Config

Change-Id: Iacd3f2db3728a9c1936170b575016973bf0336d9
This commit is contained in:
David Pursehouse
2018-05-31 21:27:46 +09:00
12 changed files with 258 additions and 78 deletions

View File

@@ -17,26 +17,22 @@ package com.google.gerrit.elasticsearch;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.gson.FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.stream.Collectors.toList;
import static org.apache.commons.codec.binary.Base64.decodeBase64;
import com.google.common.base.Strings;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Streams;
import com.google.common.io.CharStreams;
import com.google.gerrit.elasticsearch.builders.QueryBuilder;
import com.google.gerrit.elasticsearch.builders.SearchSourceBuilder;
import com.google.gerrit.elasticsearch.builders.XContentBuilder;
import com.google.gerrit.elasticsearch.bulk.DeleteRequest;
import com.google.gerrit.index.FieldDef;
import com.google.gerrit.index.FieldType;
import com.google.gerrit.index.Index;
import com.google.gerrit.index.QueryOptions;
import com.google.gerrit.index.Schema;
import com.google.gerrit.index.Schema.Values;
import com.google.gerrit.index.query.DataSource;
import com.google.gerrit.index.query.FieldBundle;
import com.google.gerrit.index.query.Predicate;
@@ -71,10 +67,7 @@ import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.eclipse.jgit.lib.Config;
@@ -87,9 +80,7 @@ abstract class AbstractElasticIndex<K, V> implements Index<K, V> {
private static final Logger log = LoggerFactory.getLogger(AbstractElasticIndex.class);
protected static final String BULK = "_bulk";
protected static final String DELETE = "delete";
protected static final String IGNORE_UNMAPPED = "ignore_unmapped";
protected static final String INDEX = "index";
protected static final String ORDER = "order";
protected static final String SEARCH = "_search";
@@ -167,7 +158,7 @@ abstract class AbstractElasticIndex<K, V> implements Index<K, V> {
@Override
public void delete(K c) throws IOException {
String uri = getURI(indexNameRaw, BULK);
Response response = performRequest(HttpPost.METHOD_NAME, addActions(c), uri, getRefreshParam());
Response response = postRequest(addActions(c), uri, getRefreshParam());
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
throw new IOException(
@@ -178,10 +169,10 @@ abstract class AbstractElasticIndex<K, V> implements Index<K, V> {
@Override
public void deleteAll() throws IOException {
// Delete the index, if it exists.
Response response = client.performRequest(HttpHead.METHOD_NAME, indexName);
Response response = client.performRequest("HEAD", indexName);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == HttpStatus.SC_OK) {
response = client.performRequest(HttpDelete.METHOD_NAME, indexName);
response = client.performRequest("DELETE", indexName);
statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
throw new IOException(
@@ -190,8 +181,7 @@ abstract class AbstractElasticIndex<K, V> implements Index<K, V> {
}
// Recreate the index.
response =
performRequest(HttpPut.METHOD_NAME, getMappings(), indexName, Collections.emptyMap());
response = performRequest("PUT", getMappings(), indexName, Collections.emptyMap());
statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
String error = String.format("Failed to create index %s: %s", indexName, statusCode);
@@ -207,33 +197,7 @@ abstract class AbstractElasticIndex<K, V> implements Index<K, V> {
protected String delete(String type, K c) {
String id = c.toString();
return toAction(type, id, DELETE);
}
private static boolean shouldAddElement(Object element) {
return !(element instanceof String) || !((String) element).isEmpty();
}
protected String toDoc(V v) throws IOException {
try (XContentBuilder closeable = new XContentBuilder()) {
XContentBuilder builder = closeable.startObject();
for (Values<V> values : schema.buildFields(v)) {
String name = values.getField().getName();
if (values.getField().isRepeatable()) {
builder.field(
name,
Streams.stream(values.getValues())
.filter(AbstractElasticIndex::shouldAddElement)
.collect(toList()));
} else {
Object element = Iterables.getOnlyElement(values.getValues(), "");
if (shouldAddElement(element)) {
builder.field(name, element);
}
}
}
return builder.endObject().string() + System.lineSeparator();
}
return new DeleteRequest(id, indexNameRaw, type).toString();
}
protected abstract V fromDocument(JsonObject doc, Set<String> fields);
@@ -313,9 +277,15 @@ abstract class AbstractElasticIndex<K, V> implements Index<K, V> {
return encodedIndexName + "/" + encodedType + "/" + request;
}
protected Response performRequest(
String method, String payload, String uri, Map<String, String> params) throws IOException {
HttpEntity entity = new NStringEntity(payload, ContentType.APPLICATION_JSON);
protected Response postRequest(Object payload, String uri, Map<String, String> params)
throws IOException {
return performRequest("POST", payload, uri, params);
}
private Response performRequest(
String method, Object payload, String uri, Map<String, String> params) throws IOException {
String payloadStr = payload instanceof String ? (String) payload : payload.toString();
HttpEntity entity = new NStringEntity(payloadStr, ContentType.APPLICATION_JSON);
return client.performRequest(method, uri, params, entity);
}

View File

@@ -18,6 +18,9 @@ import static com.google.gerrit.server.index.account.AccountField.ID;
import com.google.common.collect.ImmutableMap;
import com.google.gerrit.elasticsearch.ElasticMapping.MappingProperties;
import com.google.gerrit.elasticsearch.bulk.BulkRequest;
import com.google.gerrit.elasticsearch.bulk.IndexRequest;
import com.google.gerrit.elasticsearch.bulk.UpdateRequest;
import com.google.gerrit.index.QueryOptions;
import com.google.gerrit.index.Schema;
import com.google.gerrit.index.query.DataSource;
@@ -40,7 +43,6 @@ import com.google.inject.assistedinject.Assisted;
import java.io.IOException;
import java.util.Set;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpPost;
import org.eclipse.jgit.lib.Config;
import org.elasticsearch.client.Response;
@@ -58,6 +60,7 @@ public class ElasticAccountIndex extends AbstractElasticIndex<Account.Id, Accoun
private final AccountMapping mapping;
private final Provider<AccountCache> accountCache;
private final Schema<AccountState> schema;
@Inject
ElasticAccountIndex(
@@ -69,15 +72,16 @@ public class ElasticAccountIndex extends AbstractElasticIndex<Account.Id, Accoun
super(cfg, sitePaths, schema, clientBuilder, ACCOUNTS);
this.accountCache = accountCache;
this.mapping = new AccountMapping(schema);
this.schema = schema;
}
@Override
public void replace(AccountState as) throws IOException {
String bulk = toAction(ACCOUNTS, getId(as), INDEX);
bulk += toDoc(as);
BulkRequest bulk =
new IndexRequest(getId(as), indexName, ACCOUNTS).add(new UpdateRequest<>(schema, as));
String uri = getURI(ACCOUNTS, BULK);
Response response = performRequest(HttpPost.METHOD_NAME, bulk, uri, getRefreshParam());
Response response = postRequest(bulk, uri, getRefreshParam());
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
throw new IOException(

View File

@@ -32,6 +32,10 @@ import com.google.common.collect.Lists;
import com.google.common.collect.MultimapBuilder;
import com.google.common.collect.Sets;
import com.google.gerrit.elasticsearch.ElasticMapping.MappingProperties;
import com.google.gerrit.elasticsearch.bulk.BulkRequest;
import com.google.gerrit.elasticsearch.bulk.DeleteRequest;
import com.google.gerrit.elasticsearch.bulk.IndexRequest;
import com.google.gerrit.elasticsearch.bulk.UpdateRequest;
import com.google.gerrit.index.QueryOptions;
import com.google.gerrit.index.Schema;
import com.google.gerrit.index.query.DataSource;
@@ -67,7 +71,6 @@ import java.util.Optional;
import java.util.Set;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpPost;
import org.eclipse.jgit.lib.Config;
import org.elasticsearch.client.Response;
@@ -92,6 +95,7 @@ class ElasticChangeIndex extends AbstractElasticIndex<Change.Id, ChangeData>
private final ChangeMapping mapping;
private final Provider<ReviewDb> db;
private final ChangeData.Factory changeDataFactory;
private final Schema<ChangeData> schema;
@Inject
ElasticChangeIndex(
@@ -104,6 +108,7 @@ class ElasticChangeIndex extends AbstractElasticIndex<Change.Id, ChangeData>
super(cfg, sitePaths, schema, clientBuilder, CHANGES);
this.db = db;
this.changeDataFactory = changeDataFactory;
this.schema = schema;
mapping = new ChangeMapping(schema);
}
@@ -124,12 +129,13 @@ class ElasticChangeIndex extends AbstractElasticIndex<Change.Id, ChangeData>
throw new IOException(e);
}
String bulk = toAction(insertIndex, getId(cd), INDEX);
bulk += toDoc(cd);
bulk += toAction(deleteIndex, cd.getId().toString(), DELETE);
BulkRequest bulk =
new IndexRequest(getId(cd), indexName, insertIndex)
.add(new UpdateRequest<>(schema, cd))
.add(new DeleteRequest(cd.getId().toString(), indexName, deleteIndex));
String uri = getURI(CHANGES, BULK);
Response response = performRequest(HttpPost.METHOD_NAME, bulk, uri, getRefreshParam());
Response response = postRequest(bulk, uri, getRefreshParam());
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
throw new IOException(

View File

@@ -16,6 +16,9 @@ package com.google.gerrit.elasticsearch;
import com.google.common.collect.ImmutableMap;
import com.google.gerrit.elasticsearch.ElasticMapping.MappingProperties;
import com.google.gerrit.elasticsearch.bulk.BulkRequest;
import com.google.gerrit.elasticsearch.bulk.IndexRequest;
import com.google.gerrit.elasticsearch.bulk.UpdateRequest;
import com.google.gerrit.index.QueryOptions;
import com.google.gerrit.index.Schema;
import com.google.gerrit.index.query.DataSource;
@@ -38,7 +41,6 @@ import com.google.inject.assistedinject.Assisted;
import java.io.IOException;
import java.util.Set;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpPost;
import org.eclipse.jgit.lib.Config;
import org.elasticsearch.client.Response;
@@ -56,6 +58,7 @@ public class ElasticGroupIndex extends AbstractElasticIndex<AccountGroup.UUID, I
private final GroupMapping mapping;
private final Provider<GroupCache> groupCache;
private final Schema<InternalGroup> schema;
@Inject
ElasticGroupIndex(
@@ -67,15 +70,16 @@ public class ElasticGroupIndex extends AbstractElasticIndex<AccountGroup.UUID, I
super(cfg, sitePaths, schema, clientBuilder, GROUPS);
this.groupCache = groupCache;
this.mapping = new GroupMapping(schema);
this.schema = schema;
}
@Override
public void replace(InternalGroup group) throws IOException {
String bulk = toAction(GROUPS, getId(group), INDEX);
bulk += toDoc(group);
BulkRequest bulk =
new IndexRequest(getId(group), indexName, GROUPS).add(new UpdateRequest<>(schema, group));
String uri = getURI(GROUPS, BULK);
Response response = performRequest(HttpPost.METHOD_NAME, bulk, uri, getRefreshParam());
Response response = postRequest(bulk, uri, getRefreshParam());
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
throw new IOException(

View File

@@ -14,16 +14,14 @@
package com.google.gerrit.elasticsearch;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import static java.util.stream.Collectors.toList;
import com.google.gson.JsonParser;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpGet;
import org.elasticsearch.client.Response;
@@ -42,16 +40,14 @@ class ElasticIndexVersionDiscovery {
String name = prefix + indexName + "_";
Response response = client.performRequest(HttpGet.METHOD_NAME, name + "*/_aliases");
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == HttpStatus.SC_OK) {
String content = AbstractElasticIndex.getContent(response);
JsonObject object = new JsonParser().parse(content).getAsJsonObject();
List<String> versions = new ArrayList<>(object.size());
for (Entry<String, JsonElement> entry : object.entrySet()) {
versions.add(entry.getKey().replace(name, ""));
}
return versions;
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
return new JsonParser()
.parse(AbstractElasticIndex.getContent(response))
.getAsJsonObject()
.entrySet()
.stream()
.map(e -> e.getKey().replace(name, ""))
.collect(toList());
}
return Collections.emptyList();
}

View File

@@ -16,6 +16,9 @@ package com.google.gerrit.elasticsearch;
import com.google.common.collect.ImmutableMap;
import com.google.gerrit.elasticsearch.ElasticMapping.MappingProperties;
import com.google.gerrit.elasticsearch.bulk.BulkRequest;
import com.google.gerrit.elasticsearch.bulk.IndexRequest;
import com.google.gerrit.elasticsearch.bulk.UpdateRequest;
import com.google.gerrit.index.QueryOptions;
import com.google.gerrit.index.Schema;
import com.google.gerrit.index.project.ProjectData;
@@ -38,7 +41,6 @@ import com.google.inject.assistedinject.Assisted;
import java.io.IOException;
import java.util.Set;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpPost;
import org.eclipse.jgit.lib.Config;
import org.elasticsearch.client.Response;
@@ -56,6 +58,7 @@ public class ElasticProjectIndex extends AbstractElasticIndex<Project.NameKey, P
private final ProjectMapping mapping;
private final Provider<ProjectCache> projectCache;
private final Schema<ProjectData> schema;
@Inject
ElasticProjectIndex(
@@ -66,16 +69,18 @@ public class ElasticProjectIndex extends AbstractElasticIndex<Project.NameKey, P
@Assisted Schema<ProjectData> schema) {
super(cfg, sitePaths, schema, clientBuilder, PROJECTS);
this.projectCache = projectCache;
this.schema = schema;
this.mapping = new ProjectMapping(schema);
}
@Override
public void replace(ProjectData projectState) throws IOException {
String bulk = toAction(PROJECTS, getId(projectState), INDEX);
bulk += toDoc(projectState);
BulkRequest bulk =
new IndexRequest(projectState.getProject().getName(), indexName, PROJECTS)
.add(new UpdateRequest<>(schema, projectState));
String uri = getURI(PROJECTS, BULK);
Response response = performRequest(HttpPost.METHOD_NAME, bulk, uri, getRefreshParam());
Response response = postRequest(bulk, uri, getRefreshParam());
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
throw new IOException(

View File

@@ -14,7 +14,7 @@
package com.google.gerrit.elasticsearch;
import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
import com.google.common.primitives.Ints;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.index.Index;
@@ -50,7 +50,7 @@ public class ElasticVersionManager extends VersionManager {
ElasticIndexVersionDiscovery versionDiscovery) {
super(sitePaths, listeners, defs, VersionManager.getOnlineUpgrade(cfg));
this.versionDiscovery = versionDiscovery;
prefix = MoreObjects.firstNonNull(cfg.getString("index", null, "prefix"), "gerrit");
prefix = Strings.nullToEmpty(cfg.getString("elasticsearch", null, "prefix"));
}
@Override

View File

@@ -0,0 +1,44 @@
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.elasticsearch.bulk;
import com.google.gson.JsonObject;
abstract class ActionRequest extends BulkRequest {
private final String action;
private final String id;
private final String index;
private final String type;
protected ActionRequest(String action, String id, String index, String type) {
this.action = action;
this.id = id;
this.index = index;
this.type = type;
}
@Override
protected String getRequest() {
JsonObject properties = new JsonObject();
properties.addProperty("_id", id);
properties.addProperty("_index", index);
properties.addProperty("_type", type);
JsonObject jsonAction = new JsonObject();
jsonAction.add(action, properties);
return jsonAction.toString() + System.lineSeparator();
}
}

View File

@@ -0,0 +1,43 @@
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.elasticsearch.bulk;
import java.util.ArrayList;
import java.util.List;
public abstract class BulkRequest {
private final List<BulkRequest> requests = new ArrayList<>();
protected BulkRequest() {
add(this);
}
public BulkRequest add(BulkRequest request) {
requests.add(request);
return this;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
for (BulkRequest request : requests) {
builder.append(request.getRequest());
}
return builder.toString();
}
protected abstract String getRequest();
}

View File

@@ -0,0 +1,22 @@
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.elasticsearch.bulk;
public class DeleteRequest extends ActionRequest {
public DeleteRequest(String id, String index, String type) {
super("delete", id, index, type);
}
}

View File

@@ -0,0 +1,22 @@
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.elasticsearch.bulk;
public class IndexRequest extends ActionRequest {
public IndexRequest(String id, String index, String type) {
super("index", id, index, type);
}
}

View File

@@ -0,0 +1,64 @@
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.elasticsearch.bulk;
import static java.util.stream.Collectors.toList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import com.google.gerrit.elasticsearch.builders.XContentBuilder;
import com.google.gerrit.index.Schema;
import com.google.gerrit.index.Schema.Values;
import java.io.IOException;
public class UpdateRequest<V> extends BulkRequest {
private final Schema<V> schema;
private final V v;
public UpdateRequest(Schema<V> schema, V v) {
this.schema = schema;
this.v = v;
}
@Override
protected String getRequest() {
try (XContentBuilder closeable = new XContentBuilder()) {
XContentBuilder builder = closeable.startObject();
for (Values<V> values : schema.buildFields(v)) {
String name = values.getField().getName();
if (values.getField().isRepeatable()) {
builder.field(
name,
Streams.stream(values.getValues())
.filter(e -> shouldAddElement(e))
.collect(toList()));
} else {
Object element = Iterables.getOnlyElement(values.getValues(), "");
if (shouldAddElement(element)) {
builder.field(name, element);
}
}
}
return builder.endObject().string() + System.lineSeparator();
} catch (IOException e) {
return e.toString();
}
}
private boolean shouldAddElement(Object element) {
return !(element instanceof String) || !((String) element).isEmpty();
}
}