Include failures in partial response (#124929)
This change includes failures when ESQL returns partial results. It also carries failures between cluster requests. Relates #122802
This commit is contained in:
parent
36874e8663
commit
6b6fc8028d
|
@ -0,0 +1,5 @@
|
|||
pr: 124929
|
||||
summary: Include failures in partial response
|
||||
area: ES|QL
|
||||
type: enhancement
|
||||
issues: []
|
|
@ -185,6 +185,7 @@ public class TransportVersions {
|
|||
public static final TransportVersion ESQL_THREAD_NAME_IN_DRIVER_PROFILE = def(9_027_0_00);
|
||||
public static final TransportVersion INFERENCE_CONTEXT = def(9_028_0_00);
|
||||
public static final TransportVersion ML_INFERENCE_DEEPSEEK = def(9_029_00_0);
|
||||
public static final TransportVersion ESQL_FAILURE_FROM_REMOTE = def(9_030_00_0);
|
||||
|
||||
/*
|
||||
* STOP! READ THIS FIRST! No, really,
|
||||
|
|
|
@ -14,18 +14,22 @@ import org.elasticsearch.client.Request;
|
|||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.client.ResponseException;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.elasticsearch.test.cluster.ElasticsearchCluster;
|
||||
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
|
||||
import org.elasticsearch.test.rest.ESRestTestCase;
|
||||
import org.junit.ClassRule;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
|
||||
public class EsqlPartialResultsIT extends ESRestTestCase {
|
||||
|
@ -97,6 +101,7 @@ public class EsqlPartialResultsIT extends ESRestTestCase {
|
|||
return okIds;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testPartialResult() throws Exception {
|
||||
Set<String> okIds = populateIndices();
|
||||
String query = """
|
||||
|
@ -113,11 +118,30 @@ public class EsqlPartialResultsIT extends ESRestTestCase {
|
|||
}
|
||||
Response resp = client().performRequest(request);
|
||||
Map<String, Object> results = entityAsMap(resp);
|
||||
logger.info("--> results {}", results);
|
||||
assertThat(results.get("is_partial"), equalTo(true));
|
||||
List<?> columns = (List<?>) results.get("columns");
|
||||
assertThat(columns, equalTo(List.of(Map.of("name", "fail_me", "type", "long"), Map.of("name", "v", "type", "long"))));
|
||||
List<?> values = (List<?>) results.get("values");
|
||||
assertThat(values.size(), lessThanOrEqualTo(okIds.size()));
|
||||
Map<String, Object> localInfo = (Map<String, Object>) XContentMapValues.extractValue(
|
||||
results,
|
||||
"_clusters",
|
||||
"details",
|
||||
"(local)"
|
||||
);
|
||||
assertNotNull(localInfo);
|
||||
assertThat(XContentMapValues.extractValue(localInfo, "_shards", "successful"), equalTo(0));
|
||||
assertThat(
|
||||
XContentMapValues.extractValue(localInfo, "_shards", "failed"),
|
||||
equalTo(XContentMapValues.extractValue(localInfo, "_shards", "total"))
|
||||
);
|
||||
List<Map<String, Object>> failures = (List<Map<String, Object>>) XContentMapValues.extractValue(localInfo, "failures");
|
||||
assertThat(failures, hasSize(1));
|
||||
assertThat(
|
||||
failures.get(0).get("reason"),
|
||||
equalTo(Map.of("type", "illegal_state_exception", "reason", "Accessing failing field"))
|
||||
);
|
||||
}
|
||||
// allow_partial_results = false
|
||||
{
|
||||
|
@ -133,5 +157,81 @@ public class EsqlPartialResultsIT extends ESRestTestCase {
|
|||
assertThat(resp.getStatusLine().getStatusCode(), equalTo(500));
|
||||
assertThat(EntityUtils.toString(resp.getEntity()), containsString("Accessing failing field"));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testFailureFromRemote() throws Exception {
|
||||
setupRemoteClusters();
|
||||
try {
|
||||
Set<String> okIds = populateIndices();
|
||||
String query = """
|
||||
{
|
||||
"query": "FROM *:ok-index,*:failing-index | LIMIT 100 | KEEP fail_me,v"
|
||||
}
|
||||
""";
|
||||
// allow_partial_results = true
|
||||
Request request = new Request("POST", "/_query");
|
||||
request.setJsonEntity(query);
|
||||
if (randomBoolean()) {
|
||||
request.addParameter("allow_partial_results", "true");
|
||||
}
|
||||
Response resp = client().performRequest(request);
|
||||
Map<String, Object> results = entityAsMap(resp);
|
||||
logger.info("--> results {}", results);
|
||||
assertThat(results.get("is_partial"), equalTo(true));
|
||||
List<?> columns = (List<?>) results.get("columns");
|
||||
assertThat(columns, equalTo(List.of(Map.of("name", "fail_me", "type", "long"), Map.of("name", "v", "type", "long"))));
|
||||
List<?> values = (List<?>) results.get("values");
|
||||
assertThat(values.size(), lessThanOrEqualTo(okIds.size()));
|
||||
Map<String, Object> remoteCluster = (Map<String, Object>) XContentMapValues.extractValue(
|
||||
results,
|
||||
"_clusters",
|
||||
"details",
|
||||
"cluster_one"
|
||||
);
|
||||
assertNotNull(remoteCluster);
|
||||
assertThat(XContentMapValues.extractValue(remoteCluster, "_shards", "successful"), equalTo(0));
|
||||
assertThat(
|
||||
XContentMapValues.extractValue(remoteCluster, "_shards", "failed"),
|
||||
equalTo(XContentMapValues.extractValue(remoteCluster, "_shards", "total"))
|
||||
);
|
||||
List<Map<String, Object>> failures = (List<Map<String, Object>>) XContentMapValues.extractValue(remoteCluster, "failures");
|
||||
assertThat(failures, hasSize(1));
|
||||
assertThat(
|
||||
failures.get(0).get("reason"),
|
||||
equalTo(Map.of("type", "illegal_state_exception", "reason", "Accessing failing field"))
|
||||
);
|
||||
} finally {
|
||||
removeRemoteCluster();
|
||||
}
|
||||
}
|
||||
|
||||
private void setupRemoteClusters() throws IOException {
|
||||
String settings = String.format(Locale.ROOT, """
|
||||
{
|
||||
"persistent": {
|
||||
"cluster": {
|
||||
"remote": {
|
||||
"cluster_one": {
|
||||
"seeds": [ "%s" ],
|
||||
"skip_unavailable": false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
""", cluster.getTransportEndpoints());
|
||||
Request request = new Request("PUT", "/_cluster/settings");
|
||||
request.setJsonEntity(settings);
|
||||
client().performRequest(request);
|
||||
}
|
||||
|
||||
private void removeRemoteCluster() throws IOException {
|
||||
Request settingsRequest = new Request("PUT", "/_cluster/settings");
|
||||
settingsRequest.setJsonEntity("""
|
||||
{"persistent": { "cluster.*": null}}
|
||||
""");
|
||||
client().performRequest(settingsRequest);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ package org.elasticsearch.xpack.esql.action;
|
|||
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.search.ShardSearchFailure;
|
||||
import org.elasticsearch.client.internal.Client;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.breaker.CircuitBreakingException;
|
||||
|
@ -37,11 +38,13 @@ import java.util.stream.Stream;
|
|||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.in;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
public class CrossClusterQueryWithPartialResultsIT extends AbstractCrossClusterTestCase {
|
||||
|
||||
|
@ -70,6 +73,14 @@ public class CrossClusterQueryWithPartialResultsIT extends AbstractCrossClusterT
|
|||
assertClusterPartial(resp, clusterAlias, cluster.okShards + cluster.failingShards, cluster.okShards);
|
||||
}
|
||||
|
||||
private void assertClusterFailure(EsqlQueryResponse resp, String clusterAlias, String reason) {
|
||||
EsqlExecutionInfo.Cluster info = resp.getExecutionInfo().getCluster(clusterAlias);
|
||||
assertThat(info.getFailures(), not(empty()));
|
||||
for (ShardSearchFailure f : info.getFailures()) {
|
||||
assertThat(f.reason(), containsString(reason));
|
||||
}
|
||||
}
|
||||
|
||||
private void assertClusterPartial(EsqlQueryResponse resp, String clusterAlias, int totalShards, int okShards) {
|
||||
EsqlExecutionInfo.Cluster clusterInfo = resp.getExecutionInfo().getCluster(clusterAlias);
|
||||
assertThat(clusterInfo.getTotalShards(), equalTo(totalShards));
|
||||
|
@ -83,6 +94,7 @@ public class CrossClusterQueryWithPartialResultsIT extends AbstractCrossClusterT
|
|||
assertThat(clusterInfo.getSuccessfulShards(), equalTo(numShards));
|
||||
assertThat(clusterInfo.getFailedShards(), equalTo(0));
|
||||
assertThat(clusterInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
|
||||
assertThat(clusterInfo.getFailures(), empty());
|
||||
}
|
||||
|
||||
public void testPartialResults() throws Exception {
|
||||
|
@ -110,10 +122,12 @@ public class CrossClusterQueryWithPartialResultsIT extends AbstractCrossClusterT
|
|||
assertTrue(returnedIds.add(id));
|
||||
assertThat(id, is(in(allIds)));
|
||||
}
|
||||
|
||||
assertClusterPartial(resp, LOCAL_CLUSTER, local);
|
||||
assertClusterPartial(resp, REMOTE_CLUSTER_1, remote1);
|
||||
assertClusterPartial(resp, REMOTE_CLUSTER_2, remote2);
|
||||
for (String cluster : List.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)) {
|
||||
assertClusterFailure(resp, cluster, "Accessing failing field");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -139,6 +153,7 @@ public class CrossClusterQueryWithPartialResultsIT extends AbstractCrossClusterT
|
|||
assertClusterSuccess(resp, LOCAL_CLUSTER, local.okShards);
|
||||
assertClusterSuccess(resp, REMOTE_CLUSTER_1, remote1.okShards);
|
||||
assertClusterPartial(resp, REMOTE_CLUSTER_2, remote2.failingShards, 0);
|
||||
assertClusterFailure(resp, REMOTE_CLUSTER_2, "Accessing failing field");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -191,9 +206,9 @@ public class CrossClusterQueryWithPartialResultsIT extends AbstractCrossClusterT
|
|||
}
|
||||
assertThat(returnedIds, equalTo(Sets.union(local.okIds, remote1.okIds)));
|
||||
assertClusterSuccess(resp, LOCAL_CLUSTER, local.okShards);
|
||||
|
||||
EsqlExecutionInfo.Cluster remoteInfo = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1);
|
||||
assertThat(remoteInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
|
||||
assertClusterFailure(resp, REMOTE_CLUSTER_1, simulatedFailure.getMessage());
|
||||
}
|
||||
} finally {
|
||||
for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) {
|
||||
|
@ -239,9 +254,9 @@ public class CrossClusterQueryWithPartialResultsIT extends AbstractCrossClusterT
|
|||
}
|
||||
assertThat(returnedIds, equalTo(local.okIds));
|
||||
assertClusterSuccess(resp, LOCAL_CLUSTER, local.okShards);
|
||||
|
||||
EsqlExecutionInfo.Cluster remoteInfo = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1);
|
||||
assertThat(remoteInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
|
||||
assertClusterFailure(resp, REMOTE_CLUSTER_1, simulatedFailure.getMessage());
|
||||
}
|
||||
} finally {
|
||||
for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) {
|
||||
|
@ -286,8 +301,7 @@ public class CrossClusterQueryWithPartialResultsIT extends AbstractCrossClusterT
|
|||
assertThat(returnedIds, equalTo(remote1.okIds));
|
||||
EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(LOCAL_CLUSTER);
|
||||
assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
|
||||
|
||||
assertClusterSuccess(resp, REMOTE_CLUSTER_1, remote1.okShards);
|
||||
assertClusterFailure(resp, LOCAL_CLUSTER, simulatedFailure.getMessage());
|
||||
}
|
||||
} finally {
|
||||
for (TransportService transportService : cluster(LOCAL_CLUSTER).getInstances(TransportService.class)) {
|
||||
|
|
|
@ -14,6 +14,7 @@ import org.elasticsearch.core.TimeValue;
|
|||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.FailingFieldPlugin;
|
||||
import org.elasticsearch.transport.RemoteClusterService;
|
||||
import org.elasticsearch.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.xpack.esql.EsqlTestUtils;
|
||||
|
@ -26,9 +27,12 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.in;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
/**
|
||||
* Make sure the failures on the data node come back as failures over the wire.
|
||||
|
@ -121,6 +125,10 @@ public class EsqlNodeFailureIT extends AbstractEsqlIntegTestCase {
|
|||
assertThat(id, in(okIds));
|
||||
assertTrue(actualIds.add(id));
|
||||
}
|
||||
EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY);
|
||||
assertThat(localInfo.getFailures(), not(empty()));
|
||||
assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
|
||||
assertThat(localInfo.getFailures().get(0).reason(), containsString("Accessing failing field"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -163,7 +163,8 @@ final class ClusterComputeHandler implements TransportRequestHandler<ClusterComp
|
|||
builder.setTook(executionInfo.tookSoFar());
|
||||
}
|
||||
if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
|
||||
if (executionInfo.isStopped() || resp.failedShards > 0) {
|
||||
builder.setFailures(resp.failures);
|
||||
if (executionInfo.isStopped() || resp.failedShards > 0 || resp.failures.isEmpty() == false) {
|
||||
builder.setStatus(EsqlExecutionInfo.Cluster.Status.PARTIAL);
|
||||
} else {
|
||||
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);
|
||||
|
@ -251,7 +252,7 @@ final class ClusterComputeHandler implements TransportRequestHandler<ClusterComp
|
|||
try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> {
|
||||
final TimeValue took = TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos);
|
||||
final ComputeResponse r = finalResponse.get();
|
||||
return new ComputeResponse(profiles, took, r.totalShards, r.successfulShards, r.skippedShards, r.failedShards);
|
||||
return new ComputeResponse(profiles, took, r.totalShards, r.successfulShards, r.skippedShards, r.failedShards, r.failures);
|
||||
}))) {
|
||||
var exchangeSource = new ExchangeSourceHandler(
|
||||
configuration.pragmas().exchangeBufferSize(),
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
package org.elasticsearch.xpack.esql.plugin;
|
||||
|
||||
import org.elasticsearch.TransportVersions;
|
||||
import org.elasticsearch.action.search.ShardSearchFailure;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.compute.operator.DriverProfile;
|
||||
|
@ -29,9 +30,10 @@ final class ComputeResponse extends TransportResponse {
|
|||
public final int successfulShards;
|
||||
public final int skippedShards;
|
||||
public final int failedShards;
|
||||
public final List<ShardSearchFailure> failures;
|
||||
|
||||
ComputeResponse(List<DriverProfile> profiles) {
|
||||
this(profiles, null, null, null, null, null);
|
||||
this(profiles, null, null, null, null, null, List.of());
|
||||
}
|
||||
|
||||
ComputeResponse(
|
||||
|
@ -40,7 +42,8 @@ final class ComputeResponse extends TransportResponse {
|
|||
Integer totalShards,
|
||||
Integer successfulShards,
|
||||
Integer skippedShards,
|
||||
Integer failedShards
|
||||
Integer failedShards,
|
||||
List<ShardSearchFailure> failures
|
||||
) {
|
||||
this.profiles = profiles;
|
||||
this.took = took;
|
||||
|
@ -48,6 +51,7 @@ final class ComputeResponse extends TransportResponse {
|
|||
this.successfulShards = successfulShards == null ? 0 : successfulShards.intValue();
|
||||
this.skippedShards = skippedShards == null ? 0 : skippedShards.intValue();
|
||||
this.failedShards = failedShards == null ? 0 : failedShards.intValue();
|
||||
this.failures = failures;
|
||||
}
|
||||
|
||||
ComputeResponse(StreamInput in) throws IOException {
|
||||
|
@ -74,6 +78,11 @@ final class ComputeResponse extends TransportResponse {
|
|||
this.skippedShards = 0;
|
||||
this.failedShards = 0;
|
||||
}
|
||||
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_FAILURE_FROM_REMOTE)) {
|
||||
this.failures = in.readCollectionAsImmutableList(ShardSearchFailure::readShardSearchFailure);
|
||||
} else {
|
||||
this.failures = List.of();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -93,6 +102,9 @@ final class ComputeResponse extends TransportResponse {
|
|||
out.writeVInt(skippedShards);
|
||||
out.writeVInt(failedShards);
|
||||
}
|
||||
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_FAILURE_FROM_REMOTE)) {
|
||||
out.writeCollection(failures, (o, v) -> v.writeTo(o));
|
||||
}
|
||||
}
|
||||
|
||||
public List<DriverProfile> getProfiles() {
|
||||
|
@ -118,4 +130,8 @@ final class ComputeResponse extends TransportResponse {
|
|||
public int getFailedShards() {
|
||||
return failedShards;
|
||||
}
|
||||
|
||||
public List<ShardSearchFailure> getFailures() {
|
||||
return failures;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -268,6 +268,7 @@ public class ComputeService {
|
|||
.setSuccessfulShards(r.getSuccessfulShards())
|
||||
.setSkippedShards(r.getSkippedShards())
|
||||
.setFailedShards(r.getFailedShards())
|
||||
.setFailures(r.failures)
|
||||
.build()
|
||||
);
|
||||
dataNodesListener.onResponse(r.getProfiles());
|
||||
|
|
|
@ -101,6 +101,7 @@ final class DataNodeComputeHandler implements TransportRequestHandler<DataNodeRe
|
|||
new DataNodeRequestSender(
|
||||
transportService,
|
||||
esqlExecutor,
|
||||
clusterAlias,
|
||||
parentTask,
|
||||
configuration.allowPartialResults(),
|
||||
configuration.pragmas().maxConcurrentNodesPerCluster()
|
||||
|
|
|
@ -15,6 +15,7 @@ import org.elasticsearch.action.OriginalIndices;
|
|||
import org.elasticsearch.action.search.SearchShardsGroup;
|
||||
import org.elasticsearch.action.search.SearchShardsRequest;
|
||||
import org.elasticsearch.action.search.SearchShardsResponse;
|
||||
import org.elasticsearch.action.search.ShardSearchFailure;
|
||||
import org.elasticsearch.action.support.TransportActions;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
|
||||
|
@ -26,6 +27,7 @@ import org.elasticsearch.core.TimeValue;
|
|||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.search.SearchShardTarget;
|
||||
import org.elasticsearch.search.internal.AliasFilter;
|
||||
import org.elasticsearch.tasks.CancellableTask;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
|
@ -73,6 +75,7 @@ abstract class DataNodeRequestSender {
|
|||
|
||||
private final TransportService transportService;
|
||||
private final Executor esqlExecutor;
|
||||
private final String clusterAlias;
|
||||
private final CancellableTask rootTask;
|
||||
private final boolean allowPartialResults;
|
||||
private final Semaphore concurrentRequests;
|
||||
|
@ -87,12 +90,14 @@ abstract class DataNodeRequestSender {
|
|||
DataNodeRequestSender(
|
||||
TransportService transportService,
|
||||
Executor esqlExecutor,
|
||||
String clusterAlias,
|
||||
CancellableTask rootTask,
|
||||
boolean allowPartialResults,
|
||||
int concurrentRequests
|
||||
) {
|
||||
this.transportService = transportService;
|
||||
this.esqlExecutor = esqlExecutor;
|
||||
this.clusterAlias = clusterAlias;
|
||||
this.rootTask = rootTask;
|
||||
this.allowPartialResults = allowPartialResults;
|
||||
this.concurrentRequests = concurrentRequests > 0 ? new Semaphore(concurrentRequests) : null;
|
||||
|
@ -115,7 +120,8 @@ abstract class DataNodeRequestSender {
|
|||
targetShards.totalShards(),
|
||||
targetShards.totalShards() - shardFailures.size() - skippedShards.get(),
|
||||
targetShards.skippedShards() + skippedShards.get(),
|
||||
shardFailures.size()
|
||||
shardFailures.size(),
|
||||
selectFailures()
|
||||
);
|
||||
}))) {
|
||||
for (TargetShard shard : targetShards.shards.values()) {
|
||||
|
@ -208,6 +214,27 @@ abstract class DataNodeRequestSender {
|
|||
}
|
||||
}
|
||||
|
||||
private List<ShardSearchFailure> selectFailures() {
|
||||
assert reportedFailure == false;
|
||||
final List<ShardSearchFailure> failures = new ArrayList<>();
|
||||
final Set<Exception> seen = Collections.newSetFromMap(new IdentityHashMap<>());
|
||||
for (Map.Entry<ShardId, ShardFailure> e : shardFailures.entrySet()) {
|
||||
final ShardFailure failure = e.getValue();
|
||||
if (ExceptionsHelper.unwrap(failure.failure(), TaskCancelledException.class) != null) {
|
||||
continue;
|
||||
}
|
||||
if (seen.add(failure.failure) && failures.size() < 5) {
|
||||
failures.add(new ShardSearchFailure(failure.failure, new SearchShardTarget(null, e.getKey(), clusterAlias)));
|
||||
}
|
||||
}
|
||||
// pick any cancellation exception
|
||||
if (failures.isEmpty() && shardFailures.isEmpty() == false) {
|
||||
final ShardFailure any = shardFailures.values().iterator().next();
|
||||
failures.add(new ShardSearchFailure(any.failure));
|
||||
}
|
||||
return failures;
|
||||
}
|
||||
|
||||
private void sendOneNodeRequest(TargetShards targetShards, ComputeListener computeListener, NodeRequest request) {
|
||||
final ActionListener<List<DriverProfile>> listener = computeListener.acquireCompute();
|
||||
sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() {
|
||||
|
|
|
@ -62,6 +62,7 @@ import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_WARM_NODE_RO
|
|||
import static org.elasticsearch.xpack.esql.plugin.DataNodeRequestSender.NodeRequest;
|
||||
import static org.hamcrest.Matchers.anyOf;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.in;
|
||||
|
@ -145,6 +146,10 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
|
|||
assertThat(resp.totalShards, equalTo(3));
|
||||
assertThat(resp.failedShards, equalTo(1));
|
||||
assertThat(resp.successfulShards, equalTo(2));
|
||||
assertThat(resp.failures, not(empty()));
|
||||
assertNotNull(resp.failures.get(0).shard());
|
||||
assertThat(resp.failures.get(0).shard().getShardId(), equalTo(shard3));
|
||||
assertThat(resp.failures.get(0).reason(), containsString("no shard copies found"));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -453,6 +458,7 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
|
|||
DataNodeRequestSender requestSender = new DataNodeRequestSender(
|
||||
transportService,
|
||||
executor,
|
||||
"",
|
||||
task,
|
||||
allowPartialResults,
|
||||
concurrentRequests
|
||||
|
|
Loading…
Reference in New Issue