Run coordinating can_match in field-caps (#127734)

Currently, we don't run the coordinating can_match to skip unmatched 
shards in field-caps. Most of the time, this is fine, but the current 
field-caps fails when the target shards are unavailable, even if they
don't match the index filter. This change integrates the coordinating
can_match into field-caps to prevent failures in such cases.
This commit is contained in:
Nhat Nguyen 2025-05-06 09:52:47 -07:00 committed by GitHub
parent f912323068
commit 73f5125d79
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 267 additions and 6 deletions

View File

@ -0,0 +1,5 @@
pr: 127734
summary: Run coordinating `can_match` in field-caps
area: ES|QL
type: enhancement
issues: []

View File

@ -0,0 +1,181 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.action.fieldcaps;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.PointValues;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.index.shard.ShardLongFieldRange;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
public class FieldCapsWithFilterIT extends ESIntegTestCase {
@Override
protected boolean addMockInternalEngine() {
return false;
}
private static class EngineWithExposingTimestamp extends InternalEngine {
EngineWithExposingTimestamp(EngineConfig engineConfig) {
super(engineConfig);
assert IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(config().getIndexSettings().getSettings()) : "require read-only index";
}
@Override
public ShardLongFieldRange getRawFieldRange(String field) {
try (Searcher searcher = acquireSearcher("test")) {
final DirectoryReader directoryReader = searcher.getDirectoryReader();
final byte[] minPackedValue = PointValues.getMinPackedValue(directoryReader, field);
final byte[] maxPackedValue = PointValues.getMaxPackedValue(directoryReader, field);
if (minPackedValue == null || maxPackedValue == null) {
assert minPackedValue == null && maxPackedValue == null
: Arrays.toString(minPackedValue) + "-" + Arrays.toString(maxPackedValue);
return ShardLongFieldRange.EMPTY;
}
return ShardLongFieldRange.of(LongPoint.decodeDimension(minPackedValue, 0), LongPoint.decodeDimension(maxPackedValue, 0));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
public static class ExposingTimestampEnginePlugin extends Plugin implements EnginePlugin {
@Override
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
if (IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(indexSettings.getSettings())) {
return Optional.of(EngineWithExposingTimestamp::new);
} else {
return Optional.of(new InternalEngineFactory());
}
}
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), ExposingTimestampEnginePlugin.class);
}
void createIndexAndIndexDocs(String index, Settings.Builder indexSettings, long timestamp, boolean exposeTimestamp) throws Exception {
Client client = client();
assertAcked(
client.admin()
.indices()
.prepareCreate(index)
.setSettings(indexSettings)
.setMapping("@timestamp", "type=date", "position", "type=long")
);
int numDocs = between(100, 500);
for (int i = 0; i < numDocs; i++) {
client.prepareIndex(index).setSource("position", i, "@timestamp", timestamp + i).get();
}
if (exposeTimestamp) {
client.admin().indices().prepareClose(index).get();
client.admin()
.indices()
.prepareUpdateSettings(index)
.setSettings(Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build())
.get();
client.admin().indices().prepareOpen(index).get();
assertBusy(() -> {
IndexLongFieldRange timestampRange = clusterService().state().metadata().getProject().index(index).getTimestampRange();
assertTrue(Strings.toString(timestampRange), timestampRange.containsAllShardRanges());
});
} else {
client.admin().indices().prepareRefresh(index).get();
}
}
public void testSkipUnmatchedShards() throws Exception {
long oldTimestamp = randomLongBetween(10_000_000, 20_000_000);
long newTimestamp = randomLongBetween(30_000_000, 50_000_000);
String redNode = internalCluster().startDataOnlyNode();
String blueNode = internalCluster().startDataOnlyNode();
createIndexAndIndexDocs(
"index_old",
indexSettings(between(1, 5), 0).put("index.routing.allocation.include._name", redNode),
oldTimestamp,
true
);
internalCluster().stopNode(redNode);
createIndexAndIndexDocs(
"index_new",
indexSettings(between(1, 5), 0).put("index.routing.allocation.include._name", blueNode),
newTimestamp,
false
);
// fails without index filter
{
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
request.indices("index_*");
request.fields("*");
request.setMergeResults(false);
if (randomBoolean()) {
request.indexFilter(new RangeQueryBuilder("@timestamp").from(oldTimestamp));
}
var response = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, request));
assertThat(response.getIndexResponses(), hasSize(1));
assertThat(response.getIndexResponses().get(0).getIndexName(), equalTo("index_new"));
assertThat(response.getFailures(), hasSize(1));
assertThat(response.getFailures().get(0).getIndices(), equalTo(new String[] { "index_old" }));
assertThat(response.getFailures().get(0).getException(), instanceOf(NoShardAvailableActionException.class));
}
// skip unavailable shards with index filter
{
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
request.indices("index_*");
request.fields("*");
request.indexFilter(new RangeQueryBuilder("@timestamp").from(newTimestamp));
request.setMergeResults(false);
var response = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, request));
assertThat(response.getIndexResponses(), hasSize(1));
assertThat(response.getIndexResponses().get(0).getIndexName(), equalTo("index_new"));
assertThat(response.getFailures(), empty());
}
// skip both indices on the coordinator, one the data nodes
{
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
request.indices("index_*");
request.fields("*");
request.indexFilter(new RangeQueryBuilder("@timestamp").from(newTimestamp * 2L));
request.setMergeResults(false);
var response = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, request));
assertThat(response.getIndexResponses(), empty());
assertThat(response.getFailures(), empty());
}
}
}

View File

@ -906,6 +906,10 @@ public class FieldCapabilitiesIT extends ESIntegTestCase {
@Override
protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) throws IOException {
// skip rewriting on the coordinator
if (queryRewriteContext.convertToCoordinatorRewriteContext() != null) {
return this;
}
try {
blockingLatch.await();
} catch (InterruptedException e) {

View File

@ -246,6 +246,7 @@ public class TransportVersions {
public static final TransportVersion WRITE_LOAD_INCLUDES_BUFFER_WRITES = def(9_070_00_0);
public static final TransportVersion INTRODUCE_FAILURES_DEFAULT_RETENTION = def(9_071_0_00);
public static final TransportVersion FILE_SETTINGS_HEALTH_INFO = def(9_072_0_00);
public static final TransportVersion FIELD_CAPS_ADD_CLUSTER_ALIAS = def(9_073_0_00);
/*
* STOP! READ THIS FIRST! No, really,

View File

@ -22,6 +22,7 @@ import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
@ -37,6 +38,8 @@ public final class FieldCapabilitiesRequest extends ActionRequest implements Ind
public static final String NAME = "field_caps_request";
public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosed();
private String clusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
private String[] indices = Strings.EMPTY_ARRAY;
private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;
private String[] fields = Strings.EMPTY_ARRAY;
@ -67,6 +70,11 @@ public final class FieldCapabilitiesRequest extends ActionRequest implements Ind
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
includeEmptyFields = in.readBoolean();
}
if (in.getTransportVersion().onOrAfter(TransportVersions.FIELD_CAPS_ADD_CLUSTER_ALIAS)) {
clusterAlias = in.readOptionalString();
} else {
clusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
}
}
public FieldCapabilitiesRequest() {}
@ -90,6 +98,14 @@ public final class FieldCapabilitiesRequest extends ActionRequest implements Ind
this.mergeResults = mergeResults;
}
void clusterAlias(String clusterAlias) {
this.clusterAlias = clusterAlias;
}
String clusterAlias() {
return clusterAlias;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -108,6 +124,9 @@ public final class FieldCapabilitiesRequest extends ActionRequest implements Ind
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
out.writeBoolean(includeEmptyFields);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.FIELD_CAPS_ADD_CLUSTER_ALIAS)) {
out.writeOptionalString(clusterAlias);
}
}
@Override

View File

@ -26,8 +26,14 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.index.query.CoordinatorRewriteContextProvider;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
@ -72,6 +78,7 @@ final class RequestDispatcher {
ClusterService clusterService,
TransportService transportService,
ProjectResolver projectResolver,
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider,
Task parentTask,
FieldCapabilitiesRequest fieldCapsRequest,
OriginalIndices originalIndices,
@ -105,8 +112,14 @@ final class RequestDispatcher {
onIndexFailure.accept(index, e);
continue;
}
final IndexSelector indexResult = new IndexSelector(shardIts);
if (indexResult.nodeToShards.isEmpty()) {
final IndexSelector indexResult = new IndexSelector(
fieldCapsRequest.clusterAlias(),
shardIts,
fieldCapsRequest.indexFilter(),
nowInMillis,
coordinatorRewriteContextProvider
);
if (indexResult.nodeToShards.isEmpty() && indexResult.unmatchedShardIds.isEmpty()) {
onIndexFailure.accept(index, new NoShardAvailableActionException(null, "index [" + index + "] has no active shard copy"));
} else {
this.indexSelectors.put(index, indexResult);
@ -255,10 +268,34 @@ final class RequestDispatcher {
private final Set<ShardId> unmatchedShardIds = new HashSet<>();
private final Map<ShardId, Exception> failures = new HashMap<>();
IndexSelector(List<ShardIterator> shardIts) {
IndexSelector(
String clusterAlias,
List<ShardIterator> shardIts,
QueryBuilder indexFilter,
long nowInMillis,
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider
) {
for (ShardIterator shardIt : shardIts) {
for (ShardRouting shard : shardIt) {
nodeToShards.computeIfAbsent(shard.currentNodeId(), node -> new ArrayList<>()).add(shard);
boolean canMatch = true;
final ShardId shardId = shardIt.shardId();
if (indexFilter != null && indexFilter instanceof MatchAllQueryBuilder == false) {
var coordinatorRewriteContext = coordinatorRewriteContextProvider.getCoordinatorRewriteContext(shardId.getIndex());
if (coordinatorRewriteContext != null) {
var shardRequest = new ShardSearchRequest(shardId, nowInMillis, AliasFilter.EMPTY, clusterAlias);
shardRequest.source(new SearchSourceBuilder().query(indexFilter));
try {
canMatch = SearchService.queryStillMatchesAfterRewrite(shardRequest, coordinatorRewriteContext);
} catch (Exception e) {
// treat as if shard is still a potential match
}
}
}
if (canMatch) {
for (ShardRouting shard : shardIt) {
nodeToShards.computeIfAbsent(shard.currentNodeId(), node -> new ArrayList<>()).add(shard);
}
} else {
unmatchedShardIds.add(shardId);
}
}
}

View File

@ -250,6 +250,7 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
clusterService,
transportService,
projectResolver,
indicesService.getCoordinatorRewriteContextProvider(() -> nowInMillis),
task,
request,
localIndices,
@ -273,7 +274,7 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
singleThreadedExecutor,
RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE
);
FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(request, originalIndices, nowInMillis);
FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(clusterAlias, request, originalIndices, nowInMillis);
ActionListener<FieldCapabilitiesResponse> remoteListener = ActionListener.wrap(response -> {
for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) {
String indexName = RemoteClusterAware.buildRemoteIndexName(clusterAlias, resp.getIndexName());
@ -383,11 +384,13 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
}
private static FieldCapabilitiesRequest prepareRemoteRequest(
String clusterAlias,
FieldCapabilitiesRequest request,
OriginalIndices originalIndices,
long nowInMillis
) {
FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
remoteRequest.clusterAlias(clusterAlias);
remoteRequest.setMergeResults(false); // we need to merge on this node
remoteRequest.indicesOptions(originalIndices.indicesOptions());
remoteRequest.indices(originalIndices.indices());

View File

@ -49,6 +49,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.query.CoordinatorRewriteContextProvider;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.shard.ShardId;
@ -135,6 +136,7 @@ public class RequestDispatcherTests extends ESAllocationTestCase {
mockClusterService(clusterState),
transportService,
TestProjectResolvers.singleProject(projectId),
coordinatorRewriteContextProvider(),
newRandomParentTask(),
randomFieldCapRequest(withFilter),
OriginalIndices.NONE,
@ -206,6 +208,7 @@ public class RequestDispatcherTests extends ESAllocationTestCase {
mockClusterService(clusterState),
transportService,
TestProjectResolvers.singleProject(projectId),
coordinatorRewriteContextProvider(),
newRandomParentTask(),
randomFieldCapRequest(withFilter),
OriginalIndices.NONE,
@ -328,6 +331,7 @@ public class RequestDispatcherTests extends ESAllocationTestCase {
mockClusterService(clusterState),
transportService,
TestProjectResolvers.singleProject(projectId),
coordinatorRewriteContextProvider(),
newRandomParentTask(),
randomFieldCapRequest(withFilter),
OriginalIndices.NONE,
@ -452,6 +456,7 @@ public class RequestDispatcherTests extends ESAllocationTestCase {
mockClusterService(clusterState),
transportService,
TestProjectResolvers.singleProject(projectId),
coordinatorRewriteContextProvider(),
newRandomParentTask(),
randomFieldCapRequest(withFilter),
OriginalIndices.NONE,
@ -550,6 +555,7 @@ public class RequestDispatcherTests extends ESAllocationTestCase {
mockClusterService(clusterState),
transportService,
TestProjectResolvers.singleProject(projectId),
coordinatorRewriteContextProvider(),
newRandomParentTask(),
randomFieldCapRequest(withFilter),
OriginalIndices.NONE,
@ -642,6 +648,7 @@ public class RequestDispatcherTests extends ESAllocationTestCase {
mockClusterService(clusterState),
transportService,
TestProjectResolvers.singleProject(projectId),
coordinatorRewriteContextProvider(),
newRandomParentTask(),
randomFieldCapRequest(withFilter),
OriginalIndices.NONE,
@ -1032,4 +1039,8 @@ public class RequestDispatcherTests extends ESAllocationTestCase {
when(clusterService.operationRouting()).thenReturn(operationRouting);
return clusterService;
}
static CoordinatorRewriteContextProvider coordinatorRewriteContextProvider() {
return mock(CoordinatorRewriteContextProvider.class);
}
}