Add query plans to profile output (#128828)

This commit is contained in:
Ievgen Degtiarenko 2025-06-25 10:50:04 +02:00 committed by GitHub
parent 8b62a55f2f
commit 56d5009924
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 208 additions and 94 deletions

View File

@ -322,6 +322,7 @@ public class TransportVersions {
public static final TransportVersion CLUSTER_STATE_PROJECTS_SETTINGS = def(9_108_0_00);
public static final TransportVersion ML_INFERENCE_ELASTIC_DENSE_TEXT_EMBEDDINGS_ADDED = def(9_109_00_0);
public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION = def(9_110_0_00);
public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN = def(9_111_0_00);
/*
* STOP! READ THIS FIRST! No, really,

View File

@ -2713,9 +2713,11 @@ public abstract class ESRestTestCase extends ESTestCase {
}
protected static MapMatcher getProfileMatcher() {
return matchesMap().entry("query", instanceOf(Map.class))
return matchesMap() //
.entry("query", instanceOf(Map.class))
.entry("planning", instanceOf(Map.class))
.entry("drivers", instanceOf(List.class));
.entry("drivers", instanceOf(List.class))
.entry("plans", instanceOf(List.class));
}
protected static MapMatcher getResultMatcher(boolean includeMetadata, boolean includePartial, boolean includeDocumentsFound) {

View File

@ -7,6 +7,7 @@
package org.elasticsearch.compute.operator;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
@ -24,23 +25,34 @@ import java.util.concurrent.atomic.AtomicLong;
* <strong>roughly</strong> the number of documents times the number of
* fields per document. Except {@code null} values don't count.
* And multivalued fields count as many times as there are values.
* @param collectedProfiles {@link DriverProfile}s from each driver. These are fairly cheap to build but
* @param driverProfiles {@link DriverProfile}s from each driver. These are fairly cheap to build but
* not free so this will be empty if the {@code profile} option was not set in
* the request.
*/
public record DriverCompletionInfo(long documentsFound, long valuesLoaded, List<DriverProfile> collectedProfiles) implements Writeable {
public record DriverCompletionInfo(
long documentsFound,
long valuesLoaded,
List<DriverProfile> driverProfiles,
List<PlanProfile> planProfiles
) implements Writeable {
/**
* Completion info we use when we didn't properly complete any drivers.
* Usually this is returned with an error, but it's also used when receiving
* responses from very old nodes.
*/
public static final DriverCompletionInfo EMPTY = new DriverCompletionInfo(0, 0, List.of());
public static final DriverCompletionInfo EMPTY = new DriverCompletionInfo(0, 0, List.of(), List.of());
/**
* Build a {@link DriverCompletionInfo} for many drivers including their profile output.
*/
public static DriverCompletionInfo includingProfiles(List<Driver> drivers) {
public static DriverCompletionInfo includingProfiles(
List<Driver> drivers,
String description,
String clusterName,
String nodeName,
String planTree
) {
long documentsFound = 0;
long valuesLoaded = 0;
List<DriverProfile> collectedProfiles = new ArrayList<>(drivers.size());
@ -52,7 +64,12 @@ public record DriverCompletionInfo(long documentsFound, long valuesLoaded, List<
}
collectedProfiles.add(p);
}
return new DriverCompletionInfo(documentsFound, valuesLoaded, collectedProfiles);
return new DriverCompletionInfo(
documentsFound,
valuesLoaded,
collectedProfiles,
List.of(new PlanProfile(description, clusterName, nodeName, planTree))
);
}
/**
@ -69,33 +86,45 @@ public record DriverCompletionInfo(long documentsFound, long valuesLoaded, List<
valuesLoaded += o.valuesLoaded();
}
}
return new DriverCompletionInfo(documentsFound, valuesLoaded, List.of());
return new DriverCompletionInfo(documentsFound, valuesLoaded, List.of(), List.of());
}
public DriverCompletionInfo(StreamInput in) throws IOException {
this(in.readVLong(), in.readVLong(), in.readCollectionAsImmutableList(DriverProfile::readFrom));
public static DriverCompletionInfo readFrom(StreamInput in) throws IOException {
return new DriverCompletionInfo(
in.readVLong(),
in.readVLong(),
in.readCollectionAsImmutableList(DriverProfile::readFrom),
in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN)
? in.readCollectionAsImmutableList(PlanProfile::readFrom)
: List.of()
);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(documentsFound);
out.writeVLong(valuesLoaded);
out.writeCollection(collectedProfiles, (o, v) -> v.writeTo(o));
out.writeCollection(driverProfiles);
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN)) {
out.writeCollection(planProfiles);
}
}
public static class Accumulator {
private long documentsFound;
private long valuesLoaded;
private final List<DriverProfile> collectedProfiles = new ArrayList<>();
private final List<DriverProfile> driverProfiles = new ArrayList<>();
private final List<PlanProfile> planProfiles = new ArrayList<>();
public void accumulate(DriverCompletionInfo info) {
this.documentsFound += info.documentsFound;
this.valuesLoaded += info.valuesLoaded;
this.collectedProfiles.addAll(info.collectedProfiles);
this.driverProfiles.addAll(info.driverProfiles);
this.planProfiles.addAll(info.planProfiles);
}
public DriverCompletionInfo finish() {
return new DriverCompletionInfo(documentsFound, valuesLoaded, collectedProfiles);
return new DriverCompletionInfo(documentsFound, valuesLoaded, driverProfiles, planProfiles);
}
}
@ -103,15 +132,17 @@ public record DriverCompletionInfo(long documentsFound, long valuesLoaded, List<
private final AtomicLong documentsFound = new AtomicLong();
private final AtomicLong valuesLoaded = new AtomicLong();
private final List<DriverProfile> collectedProfiles = Collections.synchronizedList(new ArrayList<>());
private final List<PlanProfile> planProfiles = Collections.synchronizedList(new ArrayList<>());
public void accumulate(DriverCompletionInfo info) {
this.documentsFound.addAndGet(info.documentsFound);
this.valuesLoaded.addAndGet(info.valuesLoaded);
this.collectedProfiles.addAll(info.collectedProfiles);
this.collectedProfiles.addAll(info.driverProfiles);
this.planProfiles.addAll(info.planProfiles);
}
public DriverCompletionInfo finish() {
return new DriverCompletionInfo(documentsFound.get(), valuesLoaded.get(), collectedProfiles);
return new DriverCompletionInfo(documentsFound.get(), valuesLoaded.get(), collectedProfiles, planProfiles);
}
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.compute.operator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import java.io.IOException;
public record PlanProfile(String description, String clusterName, String nodeName, String planTree) implements Writeable, ToXContentObject {
public static PlanProfile readFrom(StreamInput in) throws IOException {
return new PlanProfile(in.readString(), in.readString(), in.readString(), in.readString());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(description);
out.writeString(clusterName);
out.writeString(nodeName);
out.writeString(planTree);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject()
.field("description", description)
.field("cluster_name", clusterName)
.field("node_name", nodeName)
.field("plan", planTree)
.endObject();
}
}

View File

@ -317,7 +317,9 @@ public class PushQueriesIT extends ESRestTestCase {
result,
getResultMatcher(result).entry(
"profile",
matchesMap().entry("drivers", instanceOf(List.class))
matchesMap() //
.entry("drivers", instanceOf(List.class))
.entry("plans", instanceOf(List.class))
.entry("planning", matchesMap().extraOk())
.entry("query", matchesMap().extraOk())
),

View File

@ -112,7 +112,9 @@ public class StoredFieldsSequentialIT extends ESRestTestCase {
matchesMap().entry("documents_found", documentsFound)
.entry(
"profile",
matchesMap().entry("drivers", instanceOf(List.class))
matchesMap() //
.entry("drivers", instanceOf(List.class))
.entry("plans", instanceOf(List.class))
.entry("planning", matchesMap().extraOk())
.entry("query", matchesMap().extraOk())
)

View File

@ -20,6 +20,7 @@ import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockStreamInput;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.PlanProfile;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
@ -279,6 +280,7 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
return b;
}));
content.add(ChunkedToXContentHelper.array("drivers", profile.drivers.iterator(), params));
content.add(ChunkedToXContentHelper.array("plans", profile.plans.iterator()));
content.add(ChunkedToXContentHelper.endObject());
}
content.add(ChunkedToXContentHelper.endObject());
@ -387,15 +389,23 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
return esqlResponse;
}
public record Profile(List<DriverProfile> drivers) implements Writeable {
public record Profile(List<DriverProfile> drivers, List<PlanProfile> plans) implements Writeable {
public static Profile readFrom(StreamInput in) throws IOException {
return new Profile(in.readCollectionAsImmutableList(DriverProfile::readFrom));
return new Profile(
in.readCollectionAsImmutableList(DriverProfile::readFrom),
in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN)
? in.readCollectionAsImmutableList(PlanProfile::readFrom)
: List.of()
);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(drivers);
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN)) {
out.writeCollection(plans);
}
}
}
}

View File

@ -59,10 +59,10 @@ final class ComputeResponse extends TransportResponse {
ComputeResponse(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) {
completionInfo = new DriverCompletionInfo(in);
completionInfo = DriverCompletionInfo.readFrom(in);
} else if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
if (in.readBoolean()) {
completionInfo = new DriverCompletionInfo(0, 0, in.readCollectionAsImmutableList(DriverProfile::readFrom));
completionInfo = new DriverCompletionInfo(0, 0, in.readCollectionAsImmutableList(DriverProfile::readFrom), List.of());
} else {
completionInfo = DriverCompletionInfo.EMPTY;
}
@ -96,7 +96,7 @@ final class ComputeResponse extends TransportResponse {
completionInfo.writeTo(out);
} else if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
out.writeBoolean(true);
out.writeCollection(completionInfo.collectedProfiles());
out.writeCollection(completionInfo.driverProfiles());
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
out.writeOptionalTimeValue(took);

View File

@ -19,7 +19,6 @@ import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.lucene.DataPartitioning;
import org.elasticsearch.compute.operator.Driver;
import org.elasticsearch.compute.operator.DriverCompletionInfo;
import org.elasticsearch.compute.operator.DriverTaskRunner;
import org.elasticsearch.compute.operator.FailureCollector;
@ -194,7 +193,7 @@ public class ComputeService {
List<PhysicalPlan> subplans = subplansAndMainPlan.v1();
// we have no sub plans, so we can just execute the given plan
if (subplans == null || subplans.size() == 0) {
if (subplans == null || subplans.isEmpty()) {
executePlan(sessionId, rootTask, physicalPlan, configuration, foldContext, execInfo, null, listener, null);
return;
}
@ -230,7 +229,6 @@ public class ComputeService {
);
Runnable cancelQueryOnFailure = cancelQueryOnFailure(rootTask);
PhysicalPlan finalMainPlan = mainPlan;
try (
ComputeListener localListener = new ComputeListener(
@ -238,11 +236,11 @@ public class ComputeService {
cancelQueryOnFailure,
finalListener.map(profiles -> {
execInfo.markEndQuery();
return new Result(finalMainPlan.output(), collectedPages, profiles, execInfo);
return new Result(mainPlan.output(), collectedPages, profiles, execInfo);
})
)
) {
runCompute(rootTask, computeContext, finalMainPlan, localListener.acquireCompute());
runCompute(rootTask, computeContext, mainPlan, localListener.acquireCompute());
for (int i = 0; i < subplans.size(); i++) {
var subplan = subplans.get(i);
@ -539,9 +537,7 @@ public class ComputeService {
@Override
public SourceProvider createSourceProvider() {
final Supplier<SourceProvider> supplier = () -> super.createSourceProvider();
return new ReinitializingSourceProvider(supplier);
return new ReinitializingSourceProvider(super::createSourceProvider);
}
};
contexts.add(
@ -554,7 +550,6 @@ public class ComputeService {
searchService.getIndicesService().getAnalysis(),
defaultDataPartitioning
);
final List<Driver> drivers;
try {
LocalExecutionPlanner planner = new LocalExecutionPlanner(
context.sessionId(),
@ -575,37 +570,40 @@ public class ComputeService {
LOGGER.debug("Received physical plan:\n{}", plan);
plan = PlannerUtils.localPlan(context.searchExecutionContexts(), context.configuration(), context.foldCtx(), plan);
var localPlan = PlannerUtils.localPlan(context.searchExecutionContexts(), context.configuration(), context.foldCtx(), plan);
// the planner will also set the driver parallelism in LocalExecutionPlanner.LocalExecutionPlan (used down below)
// it's doing this in the planning of EsQueryExec (the source of the data)
// see also EsPhysicalOperationProviders.sourcePhysicalOperation
LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan = planner.plan(context.description(), context.foldCtx(), plan);
LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan = planner.plan(context.description(), context.foldCtx(), localPlan);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Local execution plan:\n{}", localExecutionPlan.describe());
}
drivers = localExecutionPlan.createDrivers(context.sessionId());
var drivers = localExecutionPlan.createDrivers(context.sessionId());
if (drivers.isEmpty()) {
throw new IllegalStateException("no drivers created");
}
LOGGER.debug("using {} drivers", drivers.size());
driverRunner.executeDrivers(
task,
drivers,
transportService.getThreadPool().executor(ESQL_WORKER_THREAD_POOL_NAME),
ActionListener.releaseAfter(listener.map(ignored -> {
if (context.configuration().profile()) {
return DriverCompletionInfo.includingProfiles(
drivers,
context.description(),
clusterService.getClusterName().value(),
transportService.getLocalNode().getName(),
localPlan.toString()
);
} else {
return DriverCompletionInfo.excludingProfiles(drivers);
}
}), () -> Releasables.close(drivers))
);
} catch (Exception e) {
listener.onFailure(e);
return;
}
ActionListener<Void> listenerCollectingStatus = listener.map(ignored -> {
if (context.configuration().profile()) {
return DriverCompletionInfo.includingProfiles(drivers);
} else {
return DriverCompletionInfo.excludingProfiles(drivers);
}
});
listenerCollectingStatus = ActionListener.releaseAfter(listenerCollectingStatus, () -> Releasables.close(drivers));
driverRunner.executeDrivers(
task,
drivers,
transportService.getThreadPool().executor(ESQL_WORKER_THREAD_POOL_NAME),
listenerCollectingStatus
);
}
static PhysicalPlan reductionPlan(ExchangeSinkExec plan, boolean enable) {

View File

@ -15,6 +15,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED;
@ -33,12 +34,12 @@ final class DataNodeComputeResponse extends TransportResponse {
DataNodeComputeResponse(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) {
this.completionInfo = new DriverCompletionInfo(in);
this.completionInfo = DriverCompletionInfo.readFrom(in);
this.shardLevelFailures = in.readMap(ShardId::new, StreamInput::readException);
return;
}
if (DataNodeComputeHandler.supportShardLevelRetryFailure(in.getTransportVersion())) {
this.completionInfo = new DriverCompletionInfo(0, 0, in.readCollectionAsImmutableList(DriverProfile::readFrom));
this.completionInfo = new DriverCompletionInfo(0, 0, in.readCollectionAsImmutableList(DriverProfile::readFrom), List.of());
this.shardLevelFailures = in.readMap(ShardId::new, StreamInput::readException);
return;
}
@ -54,7 +55,7 @@ final class DataNodeComputeResponse extends TransportResponse {
return;
}
if (DataNodeComputeHandler.supportShardLevelRetryFailure(out.getTransportVersion())) {
out.writeCollection(completionInfo.collectedProfiles(), (o, v) -> v.writeTo(o));
out.writeCollection(completionInfo.driverProfiles());
out.writeMap(shardLevelFailures, (o, v) -> v.writeTo(o), StreamOutput::writeException);
return;
}

View File

@ -339,7 +339,7 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
return new ColumnInfoImpl(c.name(), c.dataType().outputType(), originalTypes);
}).toList();
EsqlQueryResponse.Profile profile = configuration.profile()
? new EsqlQueryResponse.Profile(result.completionInfo().collectedProfiles())
? new EsqlQueryResponse.Profile(result.completionInfo().driverProfiles(), result.completionInfo().planProfiles())
: null;
threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, "?0");
if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) {

View File

@ -12,8 +12,8 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.DriverSleeps;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.OperatorStatus;
import org.elasticsearch.compute.operator.PlanProfile;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.util.List;
@ -26,12 +26,14 @@ public class EsqlQueryResponseProfileTests extends AbstractWireSerializingTestCa
@Override
protected EsqlQueryResponse.Profile createTestInstance() {
return new EsqlQueryResponse.Profile(randomDriverProfiles());
return new EsqlQueryResponse.Profile(randomDriverProfiles(), randomPlanProfiles());
}
@Override
protected EsqlQueryResponse.Profile mutateInstance(EsqlQueryResponse.Profile instance) {
return new EsqlQueryResponse.Profile(randomValueOtherThan(instance.drivers(), this::randomDriverProfiles));
return randomBoolean()
? new EsqlQueryResponse.Profile(randomValueOtherThan(instance.drivers(), this::randomDriverProfiles), instance.plans())
: new EsqlQueryResponse.Profile(instance.drivers(), randomValueOtherThan(instance.plans(), this::randomPlanProfiles));
}
@Override
@ -40,34 +42,41 @@ public class EsqlQueryResponseProfileTests extends AbstractWireSerializingTestCa
}
private List<DriverProfile> randomDriverProfiles() {
return randomList(10, this::randomDriverProfile);
return randomList(
10,
() -> new DriverProfile(
randomIdentifier(),
randomIdentifier(),
randomIdentifier(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomList(10, this::randomOperatorStatus),
DriverSleeps.empty()
)
);
}
private DriverProfile randomDriverProfile() {
return new DriverProfile(
randomIdentifier(),
randomIdentifier(),
randomIdentifier(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomList(10, this::randomOperatorStatus),
DriverSleeps.empty()
private List<PlanProfile> randomPlanProfiles() {
return randomList(
10,
() -> new PlanProfile(randomIdentifier(), randomIdentifier(), randomIdentifier(), randomAlphanumericOfLength(1024))
);
}
private OperatorStatus randomOperatorStatus() {
String name = randomAlphaOfLength(4);
Operator.Status status = randomBoolean()
? null
: new AbstractPageMappingOperator.Status(
randomNonNegativeLong(),
randomNonNegativeInt(),
randomNonNegativeLong(),
randomNonNegativeLong()
);
return new OperatorStatus(name, status);
return new OperatorStatus(
randomAlphaOfLength(4),
randomBoolean()
? new AbstractPageMappingOperator.Status(
randomNonNegativeLong(),
randomNonNegativeInt(),
randomNonNegativeLong(),
randomNonNegativeLong()
)
: null
);
}
}

View File

@ -33,6 +33,7 @@ import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.DriverSleeps;
import org.elasticsearch.compute.operator.OperatorStatus;
import org.elasticsearch.compute.operator.PlanProfile;
import org.elasticsearch.compute.test.TestBlockFactory;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasables;
@ -973,7 +974,8 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
List.of(new OperatorStatus("asdf", new AbstractPageMappingOperator.Status(10021, 10, 111, 222))),
DriverSleeps.empty()
)
)
),
List.of(new PlanProfile("test", "elasticsearch", "node-1", "plan tree"))
),
false,
false,
@ -1028,6 +1030,14 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
"last" : [ ]
}
}
],
"plans" : [
{
"description" : "test",
"cluster_name" : "elasticsearch",
"node_name" : "node-1",
"plan" : "plan tree"
}
]
}
}"""));

View File

@ -16,6 +16,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.compute.operator.DriverCompletionInfo;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.DriverSleeps;
import org.elasticsearch.compute.operator.PlanProfile;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.test.ESTestCase;
@ -59,11 +60,13 @@ public class ComputeListenerTests extends ESTestCase {
}
private DriverCompletionInfo randomCompletionInfo() {
int numProfiles = randomIntBetween(0, 2);
List<DriverProfile> profiles = new ArrayList<>(numProfiles);
for (int i = 0; i < numProfiles; i++) {
profiles.add(
new DriverProfile(
return new DriverCompletionInfo(
randomNonNegativeLong(),
randomNonNegativeLong(),
randomList(
0,
2,
() -> new DriverProfile(
randomIdentifier(),
randomIdentifier(),
randomIdentifier(),
@ -75,9 +78,13 @@ public class ComputeListenerTests extends ESTestCase {
List.of(),
DriverSleeps.empty()
)
);
}
return new DriverCompletionInfo(randomNonNegativeLong(), randomNonNegativeLong(), profiles);
),
randomList(
0,
2,
() -> new PlanProfile(randomIdentifier(), randomIdentifier(), randomIdentifier(), randomAlphaOfLengthBetween(1, 1024))
)
);
}
public void testEmpty() {
@ -86,7 +93,7 @@ public class ComputeListenerTests extends ESTestCase {
assertFalse(results.isDone());
}
assertTrue(results.isDone());
assertThat(results.actionGet(10, TimeUnit.SECONDS).collectedProfiles(), empty());
assertThat(results.actionGet(10, TimeUnit.SECONDS).driverProfiles(), empty());
}
public void testCollectComputeResults() {
@ -109,7 +116,7 @@ public class ComputeListenerTests extends ESTestCase {
var info = randomCompletionInfo();
documentsFound += info.documentsFound();
valuesLoaded += info.valuesLoaded();
allProfiles.addAll(info.collectedProfiles());
allProfiles.addAll(info.driverProfiles());
ActionListener<DriverCompletionInfo> subListener = computeListener.acquireCompute();
threadPool.schedule(
ActionRunnable.wrap(subListener, l -> l.onResponse(info)),
@ -123,7 +130,7 @@ public class ComputeListenerTests extends ESTestCase {
assertThat(actual.documentsFound(), equalTo(documentsFound));
assertThat(actual.valuesLoaded(), equalTo(valuesLoaded));
assertThat(
actual.collectedProfiles().stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)),
actual.driverProfiles().stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)),
equalTo(allProfiles.stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)))
);
assertThat(onFailure.get(), equalTo(0));
@ -178,7 +185,7 @@ public class ComputeListenerTests extends ESTestCase {
assertThat(result.documentsFound(), equalTo(documentsFound.get()));
assertThat(result.valuesLoaded(), equalTo(valuesLoaded.get()));
assertThat(
result.collectedProfiles().stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)),
result.driverProfiles().stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)),
equalTo(allProfiles.stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)))
);
Map<String, Set<String>> responseHeaders = threadPool.getThreadContext()
@ -216,7 +223,7 @@ public class ComputeListenerTests extends ESTestCase {
var resp = randomCompletionInfo();
documentsFound.addAndGet(resp.documentsFound());
valuesLoaded.addAndGet(resp.valuesLoaded());
allProfiles.addAll(resp.collectedProfiles());
allProfiles.addAll(resp.driverProfiles());
int numWarnings = randomIntBetween(1, 5);
Map<String, String> warnings = new HashMap<>();
for (int i = 0; i < numWarnings; i++) {