Add ESQL telemetry collection (#119474)

* Add ESQL telemetry collection
This commit is contained in:
Stanislav Malyshev 2025-01-02 14:05:21 -07:00 committed by GitHub
parent 10930498a4
commit 0292905ef6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 862 additions and 124 deletions

View File

@ -0,0 +1,5 @@
pr: 119474
summary: "Add ES|QL cross-cluster query telemetry collection"
area: ES|QL
type: enhancement
issues: []

View File

@ -25,7 +25,6 @@ Returns cluster statistics.
* If the {es} {security-features} are enabled, you must have the `monitor` or
`manage` <<privileges-list-cluster,cluster privilege>> to use this API.
[[cluster-stats-api-desc]]
==== {api-description-title}
@ -1397,7 +1396,7 @@ as a human-readable string.
`_search`:::
(object) Contains the information about the <<modules-cross-cluster-search, {ccs}>> usage in the cluster.
(object) Contains information about <<modules-cross-cluster-search, {ccs}>> usage.
+
.Properties of `_search`
[%collapsible%open]
@ -1528,7 +1527,11 @@ This may include requests where partial results were returned, but not requests
=======
======
`_esql`:::
(object) Contains information about <<esql-cross-clusters,{esql} {ccs}>> usage.
The structure of the object is the same as the `_search` object above.
=====

View File

@ -40,18 +40,12 @@ import org.elasticsearch.search.retriever.RetrieverBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.SkipUnavailableRule;
import org.elasticsearch.test.SkipUnavailableRule.NotSkipped;
import org.elasticsearch.usage.UsageService;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -59,8 +53,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry.ASYNC_FEATURE;
import static org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry.MRT_FEATURE;
@ -498,7 +490,7 @@ public class CCSUsageTelemetryIT extends AbstractMultiClustersTestCase {
assertThat(perCluster.get(REMOTE2), equalTo(null));
}
@SkipOverride(aliases = { REMOTE1 })
@NotSkipped(aliases = { REMOTE1 })
public void testRemoteTimesOutFailure() throws Exception {
Map<String, Object> testClusterInfo = setupClusters();
String remoteIndex = (String) testClusterInfo.get("remote.index");
@ -528,7 +520,7 @@ public class CCSUsageTelemetryIT extends AbstractMultiClustersTestCase {
/**
* Search when all the remotes failed and not skipped
*/
@SkipOverride(aliases = { REMOTE1, REMOTE2 })
@NotSkipped(aliases = { REMOTE1, REMOTE2 })
public void testFailedAllRemotesSearch() throws Exception {
Map<String, Object> testClusterInfo = setupClusters();
String localIndex = (String) testClusterInfo.get("local.index");
@ -577,7 +569,7 @@ public class CCSUsageTelemetryIT extends AbstractMultiClustersTestCase {
/**
* Test that we're still counting remote search even if remote cluster has no such index
*/
@SkipOverride(aliases = { REMOTE1 })
@NotSkipped(aliases = { REMOTE1 })
public void testRemoteHasNoIndexFailure() throws Exception {
SearchRequest searchRequest = makeSearchRequest(REMOTE1 + ":no_such_index");
CCSTelemetrySnapshot telemetry = getTelemetryFromFailedSearch(searchRequest);
@ -695,40 +687,4 @@ public class CCSUsageTelemetryIT extends AbstractMultiClustersTestCase {
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).execute(listener.safeMap(r -> null));
}
/**
* Annotation to mark specific cluster in a test as not to be skipped when unavailable
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@interface SkipOverride {
String[] aliases();
}
/**
* Test rule to process skip annotations
*/
static class SkipUnavailableRule implements TestRule {
private final Map<String, Boolean> skipMap;
SkipUnavailableRule(String... clusterAliases) {
this.skipMap = Arrays.stream(clusterAliases).collect(Collectors.toMap(Function.identity(), alias -> true));
}
public Map<String, Boolean> getMap() {
return skipMap;
}
@Override
public Statement apply(Statement base, Description description) {
// Check for annotation named "SkipOverride" and set the overrides accordingly
var aliases = description.getAnnotation(SkipOverride.class);
if (aliases != null) {
for (String alias : aliases.aliases()) {
skipMap.put(alias, false);
}
}
return base;
}
}
}

View File

@ -148,6 +148,7 @@ public class TransportVersions {
public static final TransportVersion SIMULATE_IGNORED_FIELDS = def(8_813_00_0);
public static final TransportVersion TRANSFORMS_UPGRADE_MODE = def(8_814_00_0);
public static final TransportVersion NODE_SHUTDOWN_EPHEMERAL_ID_ADDED = def(8_815_00_0);
public static final TransportVersion ESQL_CCS_TELEMETRY_STATS = def(8_816_00_0);
/*
* STOP! READ THIS FIRST! No, really,

View File

@ -41,7 +41,6 @@ import java.util.Objects;
* <br>
*/
public final class CCSTelemetrySnapshot implements Writeable, ToXContentFragment {
public static final String CCS_TELEMETRY_FIELD_NAME = "_search";
private long totalCount;
private long successCount;
private final Map<String, Long> failureReasons;
@ -66,6 +65,9 @@ public final class CCSTelemetrySnapshot implements Writeable, ToXContentFragment
private final Map<String, Long> clientCounts;
private final Map<String, PerClusterCCSTelemetry> byRemoteCluster;
// Whether we should use per-MRT (minimize roundtrips) metrics.
// ES|QL does not have "minimize_roundtrips" option, so we don't collect those metrics for ES|QL usage.
private boolean useMRT = true;
/**
* Creates a new stats instance with the provided info.
@ -191,6 +193,11 @@ public final class CCSTelemetrySnapshot implements Writeable, ToXContentFragment
return Collections.unmodifiableMap(byRemoteCluster);
}
public CCSTelemetrySnapshot setUseMRT(boolean useMRT) {
this.useMRT = useMRT;
return this;
}
public static class PerClusterCCSTelemetry implements Writeable, ToXContentFragment {
private long count;
private long skippedCount;
@ -270,6 +277,11 @@ public final class CCSTelemetrySnapshot implements Writeable, ToXContentFragment
public int hashCode() {
return Objects.hash(count, skippedCount, took);
}
@Override
public String toString() {
return Strings.toString(this, true, true);
}
}
/**
@ -291,8 +303,10 @@ public final class CCSTelemetrySnapshot implements Writeable, ToXContentFragment
stats.featureCounts.forEach((k, v) -> featureCounts.merge(k, v, Long::sum));
stats.clientCounts.forEach((k, v) -> clientCounts.merge(k, v, Long::sum));
took.add(stats.took);
tookMrtTrue.add(stats.tookMrtTrue);
tookMrtFalse.add(stats.tookMrtFalse);
if (useMRT) {
tookMrtTrue.add(stats.tookMrtTrue);
tookMrtFalse.add(stats.tookMrtFalse);
}
remotesPerSearchMax = Math.max(remotesPerSearchMax, stats.remotesPerSearchMax);
if (totalCount > 0 && oldCount > 0) {
// Weighted average
@ -328,30 +342,28 @@ public final class CCSTelemetrySnapshot implements Writeable, ToXContentFragment
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(CCS_TELEMETRY_FIELD_NAME);
{
builder.field("total", totalCount);
builder.field("success", successCount);
builder.field("skipped", skippedRemotes);
publishLatency(builder, "took", took);
builder.field("total", totalCount);
builder.field("success", successCount);
builder.field("skipped", skippedRemotes);
publishLatency(builder, "took", took);
if (useMRT) {
publishLatency(builder, "took_mrt_true", tookMrtTrue);
publishLatency(builder, "took_mrt_false", tookMrtFalse);
builder.field("remotes_per_search_max", remotesPerSearchMax);
builder.field("remotes_per_search_avg", remotesPerSearchAvg);
builder.field("failure_reasons", failureReasons);
builder.field("features", featureCounts);
builder.field("clients", clientCounts);
builder.startObject("clusters");
{
for (var entry : byRemoteCluster.entrySet()) {
String remoteName = entry.getKey();
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(remoteName)) {
remoteName = SearchResponse.LOCAL_CLUSTER_NAME_REPRESENTATION;
}
builder.field(remoteName, entry.getValue());
}
builder.field("remotes_per_search_max", remotesPerSearchMax);
builder.field("remotes_per_search_avg", remotesPerSearchAvg);
builder.field("failure_reasons", failureReasons);
builder.field("features", featureCounts);
builder.field("clients", clientCounts);
builder.startObject("clusters");
{
for (var entry : byRemoteCluster.entrySet()) {
String remoteName = entry.getKey();
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(remoteName)) {
remoteName = SearchResponse.LOCAL_CLUSTER_NAME_REPRESENTATION;
}
builder.field(remoteName, entry.getValue());
}
builder.endObject();
}
builder.endObject();
return builder;

View File

@ -10,6 +10,7 @@
package org.elasticsearch.action.admin.cluster.stats;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ShardOperationFailedException;
@ -20,6 +21,7 @@ import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.query.SearchTimeoutException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import java.util.Arrays;
@ -84,6 +86,15 @@ public class CCSUsage {
return this;
}
public Builder setClientFromTask(Task task) {
String client = task.getHeader(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER);
if (client != null) {
return setClient(client);
} else {
return this;
}
}
public Builder skippedRemote(String remote) {
this.skippedRemotes.add(remote);
return this;
@ -133,6 +144,10 @@ public class CCSUsage {
if (ExceptionsHelper.unwrapCorruption(e) != null) {
return Result.CORRUPTION;
}
ElasticsearchStatusException se = (ElasticsearchStatusException) ExceptionsHelper.unwrap(e, ElasticsearchStatusException.class);
if (se != null && se.getDetailedMessage().contains("license")) {
return Result.LICENSE;
}
// This is kind of last resort check - if we still don't know the reason but all shard failures are remote,
// we assume it's remote's fault somehow.
if (e instanceof SearchPhaseExecutionException spe) {

View File

@ -47,6 +47,7 @@ public class CCSUsageTelemetry {
TIMEOUT("timeout"),
CORRUPTION("corruption"),
SECURITY("security"),
LICENSE("license"),
// May be helpful if there's a lot of other reasons, and it may be hard to calculate the unknowns for some clients.
UNKNOWN("other");
@ -106,8 +107,14 @@ public class CCSUsageTelemetry {
private final Map<String, LongAdder> clientCounts;
private final Map<String, PerClusterCCSTelemetry> byRemoteCluster;
// Should we calculate separate metrics per MRT?
private final boolean useMRT;
public CCSUsageTelemetry() {
this(true);
}
public CCSUsageTelemetry(boolean useMRT) {
this.byRemoteCluster = new ConcurrentHashMap<>();
totalCount = new LongAdder();
successCount = new LongAdder();
@ -119,6 +126,7 @@ public class CCSUsageTelemetry {
skippedRemotes = new LongAdder();
featureCounts = new ConcurrentHashMap<>();
clientCounts = new ConcurrentHashMap<>();
this.useMRT = useMRT;
}
public void updateUsage(CCSUsage ccsUsage) {
@ -134,10 +142,12 @@ public class CCSUsageTelemetry {
if (isSuccess(ccsUsage)) {
successCount.increment();
took.record(searchTook);
if (isMRT(ccsUsage)) {
tookMrtTrue.record(searchTook);
} else {
tookMrtFalse.record(searchTook);
if (useMRT) {
if (isMRT(ccsUsage)) {
tookMrtTrue.record(searchTook);
} else {
tookMrtFalse.record(searchTook);
}
}
ccsUsage.getPerClusterUsage().forEach((r, u) -> byRemoteCluster.computeIfAbsent(r, PerClusterCCSTelemetry::new).update(u));
} else {
@ -243,6 +253,6 @@ public class CCSUsageTelemetry {
Collections.unmodifiableMap(Maps.transformValues(featureCounts, LongAdder::longValue)),
Collections.unmodifiableMap(Maps.transformValues(clientCounts, LongAdder::longValue)),
Collections.unmodifiableMap(Maps.transformValues(byRemoteCluster, PerClusterCCSTelemetry::getSnapshot))
);
).setUseMRT(useMRT);
}
}

View File

@ -31,7 +31,8 @@ public class ClusterStatsNodeResponse extends BaseNodeResponse {
private final ClusterHealthStatus clusterStatus;
private final SearchUsageStats searchUsageStats;
private final RepositoryUsageStats repositoryUsageStats;
private final CCSTelemetrySnapshot ccsMetrics;
private final CCSTelemetrySnapshot searchCcsMetrics;
private final CCSTelemetrySnapshot esqlCcsMetrics;
public ClusterStatsNodeResponse(StreamInput in) throws IOException {
super(in);
@ -46,10 +47,15 @@ public class ClusterStatsNodeResponse extends BaseNodeResponse {
}
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
repositoryUsageStats = RepositoryUsageStats.readFrom(in);
ccsMetrics = new CCSTelemetrySnapshot(in);
searchCcsMetrics = new CCSTelemetrySnapshot(in);
} else {
repositoryUsageStats = RepositoryUsageStats.EMPTY;
ccsMetrics = new CCSTelemetrySnapshot();
searchCcsMetrics = new CCSTelemetrySnapshot();
}
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_CCS_TELEMETRY_STATS)) {
esqlCcsMetrics = new CCSTelemetrySnapshot(in);
} else {
esqlCcsMetrics = new CCSTelemetrySnapshot();
}
}
@ -61,7 +67,8 @@ public class ClusterStatsNodeResponse extends BaseNodeResponse {
ShardStats[] shardsStats,
SearchUsageStats searchUsageStats,
RepositoryUsageStats repositoryUsageStats,
CCSTelemetrySnapshot ccsTelemetrySnapshot
CCSTelemetrySnapshot ccsTelemetrySnapshot,
CCSTelemetrySnapshot esqlTelemetrySnapshot
) {
super(node);
this.nodeInfo = nodeInfo;
@ -70,7 +77,8 @@ public class ClusterStatsNodeResponse extends BaseNodeResponse {
this.clusterStatus = clusterStatus;
this.searchUsageStats = Objects.requireNonNull(searchUsageStats);
this.repositoryUsageStats = Objects.requireNonNull(repositoryUsageStats);
this.ccsMetrics = ccsTelemetrySnapshot;
this.searchCcsMetrics = ccsTelemetrySnapshot;
this.esqlCcsMetrics = esqlTelemetrySnapshot;
}
public NodeInfo nodeInfo() {
@ -101,8 +109,12 @@ public class ClusterStatsNodeResponse extends BaseNodeResponse {
return repositoryUsageStats;
}
public CCSTelemetrySnapshot getCcsMetrics() {
return ccsMetrics;
public CCSTelemetrySnapshot getSearchCcsMetrics() {
return searchCcsMetrics;
}
public CCSTelemetrySnapshot getEsqlCcsMetrics() {
return esqlCcsMetrics;
}
@Override
@ -117,8 +129,11 @@ public class ClusterStatsNodeResponse extends BaseNodeResponse {
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
repositoryUsageStats.writeTo(out);
ccsMetrics.writeTo(out);
searchCcsMetrics.writeTo(out);
} // else just drop these stats, ok for bwc
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_CCS_TELEMETRY_STATS)) {
esqlCcsMetrics.writeTo(out);
}
}
}

View File

@ -36,10 +36,14 @@ public class ClusterStatsResponse extends BaseNodesResponse<ClusterStatsNodeResp
final ClusterSnapshotStats clusterSnapshotStats;
final RepositoryUsageStats repositoryUsageStats;
final CCSTelemetrySnapshot ccsMetrics;
final CCSTelemetrySnapshot esqlMetrics;
final long timestamp;
final String clusterUUID;
private final Map<String, RemoteClusterStats> remoteClustersStats;
public static final String CCS_TELEMETRY_FIELD_NAME = "_search";
public static final String ESQL_TELEMETRY_FIELD_NAME = "_esql";
public ClusterStatsResponse(
long timestamp,
String clusterUUID,
@ -58,6 +62,7 @@ public class ClusterStatsResponse extends BaseNodesResponse<ClusterStatsNodeResp
nodesStats = new ClusterStatsNodes(nodes);
indicesStats = new ClusterStatsIndices(nodes, mappingStats, analysisStats, versionStats);
ccsMetrics = new CCSTelemetrySnapshot();
esqlMetrics = new CCSTelemetrySnapshot().setUseMRT(false);
ClusterHealthStatus status = null;
for (ClusterStatsNodeResponse response : nodes) {
// only the master node populates the status
@ -66,7 +71,10 @@ public class ClusterStatsResponse extends BaseNodesResponse<ClusterStatsNodeResp
break;
}
}
nodes.forEach(node -> ccsMetrics.add(node.getCcsMetrics()));
nodes.forEach(node -> {
ccsMetrics.add(node.getSearchCcsMetrics());
esqlMetrics.add(node.getEsqlCcsMetrics());
});
this.status = status;
this.clusterSnapshotStats = clusterSnapshotStats;
@ -147,9 +155,18 @@ public class ClusterStatsResponse extends BaseNodesResponse<ClusterStatsNodeResp
if (remoteClustersStats != null) {
builder.field("clusters", remoteClustersStats);
}
builder.startObject(CCS_TELEMETRY_FIELD_NAME);
ccsMetrics.toXContent(builder, params);
builder.endObject();
if (esqlMetrics.getTotalCount() > 0) {
builder.startObject(ESQL_TELEMETRY_FIELD_NAME);
esqlMetrics.toXContent(builder, params);
builder.endObject();
}
builder.endObject();
return builder;
}

View File

@ -103,6 +103,7 @@ public class TransportClusterStatsAction extends TransportNodesAction<
private final RepositoriesService repositoriesService;
private final SearchUsageHolder searchUsageHolder;
private final CCSUsageTelemetry ccsUsageHolder;
private final CCSUsageTelemetry esqlUsageHolder;
private final Executor clusterStateStatsExecutor;
private final MetadataStatsCache<MappingStats> mappingStatsCache;
@ -135,6 +136,7 @@ public class TransportClusterStatsAction extends TransportNodesAction<
this.repositoriesService = repositoriesService;
this.searchUsageHolder = usageService.getSearchUsageHolder();
this.ccsUsageHolder = usageService.getCcsUsageHolder();
this.esqlUsageHolder = usageService.getEsqlUsageHolder();
this.clusterStateStatsExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
this.mappingStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), MappingStats::of);
this.analysisStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), AnalysisStats::of);
@ -293,6 +295,7 @@ public class TransportClusterStatsAction extends TransportNodesAction<
final RepositoryUsageStats repositoryUsageStats = repositoriesService.getUsageStats();
final CCSTelemetrySnapshot ccsTelemetry = ccsUsageHolder.getCCSTelemetrySnapshot();
final CCSTelemetrySnapshot esqlTelemetry = esqlUsageHolder.getCCSTelemetrySnapshot();
return new ClusterStatsNodeResponse(
nodeInfo.getNode(),
@ -302,7 +305,8 @@ public class TransportClusterStatsAction extends TransportNodesAction<
shardsStats.toArray(new ShardStats[shardsStats.size()]),
searchUsageStats,
repositoryUsageStats,
ccsTelemetry
ccsTelemetry,
esqlTelemetry
);
}

View File

@ -388,10 +388,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
if (original.pointInTimeBuilder() != null) {
tl.setFeature(CCSUsageTelemetry.PIT_FEATURE);
}
String client = task.getHeader(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER);
if (client != null) {
tl.setClient(client);
}
tl.setClient(task);
// Check if any of the index patterns are wildcard patterns
var localIndices = resolvedIndices.getLocalIndices();
if (localIndices != null && Arrays.stream(localIndices.indices()).anyMatch(Regex::isSimpleMatchPattern)) {
@ -508,6 +505,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
}
}
});
final SearchSourceBuilder source = original.source();
if (shouldOpenPIT(source)) {
// disabling shard reordering for request
@ -1883,7 +1881,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
void setFeature(String feature);
void setClient(String client);
void setClient(Task task);
}
private class SearchResponseActionListener extends DelegatingActionListener<SearchResponse, SearchResponse>
@ -1917,8 +1915,8 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
}
@Override
public void setClient(String client) {
usageBuilder.setClient(client);
public void setClient(Task task) {
usageBuilder.setClientFromTask(task);
}
@Override

View File

@ -33,7 +33,8 @@ public class RestClusterStatsAction extends BaseRestHandler {
"human-readable-total-docs-size",
"verbose-dense-vector-mapping-stats",
"ccs-stats",
"retrievers-usage-stats"
"retrievers-usage-stats",
"esql-stats"
);
private static final Set<String> SUPPORTED_QUERY_PARAMETERS = Set.of("include_remotes", "nodeId", REST_TIMEOUT_PARAM);

View File

@ -26,11 +26,13 @@ public class UsageService {
private final Map<String, BaseRestHandler> handlers;
private final SearchUsageHolder searchUsageHolder;
private final CCSUsageTelemetry ccsUsageHolder;
private final CCSUsageTelemetry esqlUsageHolder;
public UsageService() {
this.handlers = new HashMap<>();
this.searchUsageHolder = new SearchUsageHolder();
this.ccsUsageHolder = new CCSUsageTelemetry();
this.esqlUsageHolder = new CCSUsageTelemetry(false);
}
/**
@ -89,4 +91,8 @@ public class UsageService {
public CCSUsageTelemetry getCcsUsageHolder() {
return ccsUsageHolder;
}
public CCSUsageTelemetry getEsqlUsageHolder() {
return esqlUsageHolder;
}
}

View File

@ -352,4 +352,20 @@ public class CCSTelemetrySnapshotTests extends AbstractWireSerializingTestCase<C
assertThat(value2Read.count(), equalTo(count1 + count2));
assertThat(value2Read.max(), equalTo(max1));
}
public void testUseMRTFalse() {
CCSTelemetrySnapshot empty = new CCSTelemetrySnapshot();
// Ignore MRT data
empty.setUseMRT(false);
var randomWithMRT = randomValueOtherThanMany(
v -> v.getTookMrtTrue().count() == 0 || v.getTookMrtFalse().count() == 0,
this::randomCCSTelemetrySnapshot
);
empty.add(randomWithMRT);
assertThat(empty.getTook().count(), equalTo(randomWithMRT.getTook().count()));
assertThat(empty.getTookMrtFalse().count(), equalTo(0L));
assertThat(empty.getTookMrtTrue().count(), equalTo(0L));
}
}

View File

@ -340,4 +340,23 @@ public class CCSUsageTelemetryTests extends ESTestCase {
CCSTelemetrySnapshot expectedSnapshot = ccsUsageHolder.getCCSTelemetrySnapshot();
assertThat(snapshot, equalTo(expectedSnapshot));
}
public void testUseMRTFalse() {
// Ignore MRT counters if instructed.
CCSUsageTelemetry ccsUsageHolder = new CCSUsageTelemetry(false);
CCSUsage.Builder builder = new CCSUsage.Builder();
builder.took(10L).setRemotesCount(1).setClient("kibana");
builder.setFeature(MRT_FEATURE);
ccsUsageHolder.updateUsage(builder.build());
builder = new CCSUsage.Builder();
builder.took(11L).setRemotesCount(1).setClient("kibana");
ccsUsageHolder.updateUsage(builder.build());
CCSTelemetrySnapshot snapshot = ccsUsageHolder.getCCSTelemetrySnapshot();
assertThat(snapshot.getTook().count(), equalTo(2L));
assertThat(snapshot.getTookMrtFalse().count(), equalTo(0L));
assertThat(snapshot.getTookMrtTrue().count(), equalTo(0L));
}
}

View File

@ -130,6 +130,7 @@ public class VersionStatsTests extends AbstractWireSerializingTestCase<VersionSt
new ShardStats[] { shardStats },
new SearchUsageStats(),
RepositoryUsageStats.EMPTY,
null,
null
);

View File

@ -1,5 +1,4 @@
{
"_search" : {
"total" : 10,
"success" : 20,
"skipped" : 5,
@ -63,5 +62,4 @@
}
}
}
}
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.test;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Test rule to process skip_unavailable override annotations
*/
public class SkipUnavailableRule implements TestRule {
private final Map<String, Boolean> skipMap;
public SkipUnavailableRule(String... clusterAliases) {
this.skipMap = Arrays.stream(clusterAliases).collect(Collectors.toMap(Function.identity(), alias -> true));
}
public Map<String, Boolean> getMap() {
return skipMap;
}
@Override
public Statement apply(Statement base, Description description) {
// Check for annotation named "SkipOverride" and set the overrides accordingly
var aliases = description.getAnnotation(NotSkipped.class);
if (aliases != null) {
for (String alias : aliases.aliases()) {
skipMap.put(alias, false);
}
}
return base;
}
/**
* Annotation to mark specific cluster in a test as not to be skipped when unavailable
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface NotSkipped {
String[] aliases();
}
}

View File

@ -12,6 +12,7 @@ import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import org.apache.http.HttpHost;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
@ -37,9 +38,11 @@ import java.util.stream.Stream;
import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.test.MapMatcher.matchesMap;
import static org.elasticsearch.xpack.esql.ccq.Clusters.REMOTE_CLUSTER_NAME;
import static org.hamcrest.Matchers.any;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasKey;
@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
public class MultiClustersIT extends ESRestTestCase {
@ -395,6 +398,38 @@ public class MultiClustersIT extends ESRestTestCase {
}
}
@SuppressWarnings("unchecked")
public void testStats() throws IOException {
Request caps = new Request("GET", "_capabilities?method=GET&path=_cluster/stats&capabilities=esql-stats");
Response capsResponse = client().performRequest(caps);
Map<String, Object> capsResult = entityAsMap(capsResponse.getEntity());
assumeTrue("esql stats capability missing", capsResult.get("supported").equals(true));
run("FROM test-local-index,*:test-remote-index | STATS total = SUM(data) BY color | SORT color", includeCCSMetadata());
Request stats = new Request("GET", "_cluster/stats");
Response statsResponse = client().performRequest(stats);
Map<String, Object> result = entityAsMap(statsResponse.getEntity());
assertThat(result, hasKey("ccs"));
Map<String, Object> ccs = (Map<String, Object>) result.get("ccs");
assertThat(ccs, hasKey("_esql"));
Map<String, Object> esql = (Map<String, Object>) ccs.get("_esql");
assertThat(esql, hasKey("total"));
assertThat(esql, hasKey("success"));
assertThat(esql, hasKey("took"));
assertThat(esql, hasKey("remotes_per_search_max"));
assertThat(esql, hasKey("remotes_per_search_avg"));
assertThat(esql, hasKey("failure_reasons"));
assertThat(esql, hasKey("features"));
assertThat(esql, hasKey("clusters"));
Map<String, Object> clusters = (Map<String, Object>) esql.get("clusters");
assertThat(clusters, hasKey(REMOTE_CLUSTER_NAME));
assertThat(clusters, hasKey("(local)"));
Map<String, Object> clusterData = (Map<String, Object>) clusters.get(REMOTE_CLUSTER_NAME);
assertThat(clusterData, hasKey("total"));
assertThat(clusterData, hasKey("skipped"));
assertThat(clusterData, hasKey("took"));
}
private RestClient remoteClusterClient() throws IOException {
var clusterHosts = parseClusterHosts(remoteCluster.getHttpAddresses());
return buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[0]));

View File

@ -0,0 +1,205 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.cluster.stats.CCSTelemetrySnapshot;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.test.SkipUnavailableRule;
import org.elasticsearch.usage.UsageService;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.core.TimeValue.timeValueMillis;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
public class AbstractCrossClustersUsageTelemetryIT extends AbstractMultiClustersTestCase {
private static final Logger LOGGER = LogManager.getLogger(AbstractCrossClustersUsageTelemetryIT.class);
protected static final String REMOTE1 = "cluster-a";
protected static final String REMOTE2 = "cluster-b";
protected static final String LOCAL_INDEX = "logs-1";
protected static final String REMOTE_INDEX = "logs-2";
// We want to send search to a specific node (we don't care which one) so that we could
// collect the CCS telemetry from it later
protected String queryNode;
@Before
public void setupQueryNode() {
// The tests are set up in a way that all queries within a single test are sent to the same node,
// thus enabling incremental collection of telemetry data, but the node is random for each test.
queryNode = cluster(LOCAL_CLUSTER).getRandomNodeName();
}
protected CCSTelemetrySnapshot getTelemetryFromQuery(String query, String client) throws ExecutionException, InterruptedException {
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
request.query(query);
request.pragmas(AbstractEsqlIntegTestCase.randomPragmas());
request.columnar(randomBoolean());
request.includeCCSMetadata(randomBoolean());
return getTelemetryFromQuery(request, client);
}
protected CCSTelemetrySnapshot getTelemetryFromQuery(EsqlQueryRequest request, String client) throws ExecutionException,
InterruptedException {
// We don't care here too much about the response, we just want to trigger the telemetry collection.
// So we check it's not null and leave the rest to other tests.
if (client != null) {
assertResponse(
cluster(LOCAL_CLUSTER).client(queryNode)
.filterWithHeader(Map.of(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER, client))
.execute(EsqlQueryAction.INSTANCE, request),
Assert::assertNotNull
);
} else {
assertResponse(cluster(LOCAL_CLUSTER).client(queryNode).execute(EsqlQueryAction.INSTANCE, request), Assert::assertNotNull);
}
return getTelemetrySnapshot(queryNode);
}
protected CCSTelemetrySnapshot getTelemetryFromAsyncQuery(String query) throws Exception {
EsqlQueryRequest request = EsqlQueryRequest.asyncEsqlQueryRequest();
request.query(query);
request.pragmas(AbstractEsqlIntegTestCase.randomPragmas());
request.columnar(randomBoolean());
request.includeCCSMetadata(randomBoolean());
request.waitForCompletionTimeout(TimeValue.timeValueMillis(100));
request.keepOnCompletion(false);
return getTelemetryFromAsyncQuery(request);
}
protected CCSTelemetrySnapshot getTelemetryFromAsyncQuery(EsqlQueryRequest request) throws Exception {
AtomicReference<String> asyncExecutionId = new AtomicReference<>();
assertResponse(cluster(LOCAL_CLUSTER).client(queryNode).execute(EsqlQueryAction.INSTANCE, request), resp -> {
if (resp.isRunning()) {
assertNotNull("async execution id is null", resp.asyncExecutionId());
asyncExecutionId.set(resp.asyncExecutionId().get());
}
});
if (asyncExecutionId.get() != null) {
assertBusy(() -> {
var getResultsRequest = new GetAsyncResultRequest(asyncExecutionId.get()).setWaitForCompletionTimeout(timeValueMillis(1));
try (
var resp = cluster(LOCAL_CLUSTER).client(queryNode)
.execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest)
.actionGet(30, TimeUnit.SECONDS)
) {
assertFalse(resp.isRunning());
}
});
}
return getTelemetrySnapshot(queryNode);
}
protected CCSTelemetrySnapshot getTelemetryFromFailedQuery(String query) throws Exception {
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
request.query(query);
request.pragmas(AbstractEsqlIntegTestCase.randomPragmas());
request.columnar(randomBoolean());
request.includeCCSMetadata(randomBoolean());
ExecutionException ee = expectThrows(
ExecutionException.class,
cluster(LOCAL_CLUSTER).client(queryNode).execute(EsqlQueryAction.INSTANCE, request)::get
);
assertNotNull(ee.getCause());
return getTelemetrySnapshot(queryNode);
}
private CCSTelemetrySnapshot getTelemetrySnapshot(String nodeName) {
var usage = cluster(LOCAL_CLUSTER).getInstance(UsageService.class, nodeName);
return usage.getEsqlUsageHolder().getCCSTelemetrySnapshot();
}
@Override
protected boolean reuseClusters() {
return false;
}
@Override
protected List<String> remoteClusterAlias() {
return List.of(REMOTE1, REMOTE2);
}
@Rule
public SkipUnavailableRule skipOverride = new SkipUnavailableRule(REMOTE1, REMOTE2);
protected Map<String, Object> setupClusters() {
int numShardsLocal = randomIntBetween(1, 5);
populateLocalIndices(LOCAL_INDEX, numShardsLocal);
int numShardsRemote = randomIntBetween(1, 5);
populateRemoteIndices(REMOTE1, REMOTE_INDEX, numShardsRemote);
Map<String, Object> clusterInfo = new HashMap<>();
clusterInfo.put("local.num_shards", numShardsLocal);
clusterInfo.put("local.index", LOCAL_INDEX);
clusterInfo.put("remote.num_shards", numShardsRemote);
clusterInfo.put("remote.index", REMOTE_INDEX);
int numShardsRemote2 = randomIntBetween(1, 5);
populateRemoteIndices(REMOTE2, REMOTE_INDEX, numShardsRemote2);
clusterInfo.put("remote2.index", REMOTE_INDEX);
clusterInfo.put("remote2.num_shards", numShardsRemote2);
return clusterInfo;
}
void populateLocalIndices(String indexName, int numShards) {
Client localClient = client(LOCAL_CLUSTER);
assertAcked(
localClient.admin()
.indices()
.prepareCreate(indexName)
.setSettings(Settings.builder().put("index.number_of_shards", numShards))
.setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long")
);
for (int i = 0; i < 10; i++) {
localClient.prepareIndex(indexName).setSource("id", "local-" + i, "tag", "local", "v", i).get();
}
localClient.admin().indices().prepareRefresh(indexName).get();
}
void populateRemoteIndices(String clusterAlias, String indexName, int numShards) {
Client remoteClient = client(clusterAlias);
assertAcked(
remoteClient.admin()
.indices()
.prepareCreate(indexName)
.setSettings(Settings.builder().put("index.number_of_shards", numShards))
.setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long")
);
for (int i = 0; i < 10; i++) {
remoteClient.prepareIndex(indexName).setSource("id", "remote-" + i, "tag", "remote", "v", i * i).get();
}
remoteClient.admin().indices().prepareRefresh(indexName).get();
}
@Override
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
var map = skipOverride.getMap();
LOGGER.info("Using skip_unavailable map: [{}]", map);
return map;
}
}

View File

@ -0,0 +1,231 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.action;
import org.elasticsearch.action.admin.cluster.stats.CCSTelemetrySnapshot;
import org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.SkipUnavailableRule;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry.ASYNC_FEATURE;
import static org.hamcrest.Matchers.equalTo;
public class CrossClustersUsageTelemetryIT extends AbstractCrossClustersUsageTelemetryIT {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class);
plugins.add(CrossClustersQueryIT.InternalExchangePlugin.class);
return plugins;
}
public void assertPerClusterCount(CCSTelemetrySnapshot.PerClusterCCSTelemetry perCluster, long count) {
assertThat(perCluster.getCount(), equalTo(count));
assertThat(perCluster.getSkippedCount(), equalTo(0L));
assertThat(perCluster.getTook().count(), equalTo(count));
}
public void testLocalRemote() throws Exception {
setupClusters();
var telemetry = getTelemetryFromQuery("from logs-*,c*:logs-* | stats sum (v)", "kibana");
assertThat(telemetry.getTotalCount(), equalTo(1L));
assertThat(telemetry.getSuccessCount(), equalTo(1L));
assertThat(telemetry.getFailureReasons().size(), equalTo(0));
assertThat(telemetry.getTook().count(), equalTo(1L));
assertThat(telemetry.getTookMrtFalse().count(), equalTo(0L));
assertThat(telemetry.getTookMrtTrue().count(), equalTo(0L));
assertThat(telemetry.getRemotesPerSearchAvg(), equalTo(2.0));
assertThat(telemetry.getRemotesPerSearchMax(), equalTo(2L));
assertThat(telemetry.getSearchCountWithSkippedRemotes(), equalTo(0L));
assertThat(telemetry.getClientCounts().size(), equalTo(1));
assertThat(telemetry.getClientCounts().get("kibana"), equalTo(1L));
assertThat(telemetry.getFeatureCounts().get(ASYNC_FEATURE), equalTo(null));
var perCluster = telemetry.getByRemoteCluster();
assertThat(perCluster.size(), equalTo(3));
for (String clusterAlias : remoteClusterAlias()) {
assertPerClusterCount(perCluster.get(clusterAlias), 1L);
}
assertPerClusterCount(perCluster.get(LOCAL_CLUSTER), 1L);
telemetry = getTelemetryFromQuery("from logs-*,c*:logs-* | stats sum (v)", "kibana");
assertThat(telemetry.getTotalCount(), equalTo(2L));
assertThat(telemetry.getClientCounts().get("kibana"), equalTo(2L));
perCluster = telemetry.getByRemoteCluster();
assertThat(perCluster.size(), equalTo(3));
for (String clusterAlias : remoteClusterAlias()) {
assertPerClusterCount(perCluster.get(clusterAlias), 2L);
}
assertPerClusterCount(perCluster.get(LOCAL_CLUSTER), 2L);
}
public void testLocalOnly() throws Exception {
setupClusters();
// Should not produce any usage info since it's a local search
var telemetry = getTelemetryFromQuery("from logs-* | stats sum (v)", "kibana");
assertThat(telemetry.getTotalCount(), equalTo(0L));
assertThat(telemetry.getSuccessCount(), equalTo(0L));
assertThat(telemetry.getByRemoteCluster().size(), equalTo(0));
}
@SkipUnavailableRule.NotSkipped(aliases = REMOTE1)
public void testFailed() throws Exception {
setupClusters();
// Should not produce any usage info since it's a local search
var telemetry = getTelemetryFromFailedQuery("from no_such_index | stats sum (v)");
assertThat(telemetry.getTotalCount(), equalTo(0L));
assertThat(telemetry.getSuccessCount(), equalTo(0L));
assertThat(telemetry.getByRemoteCluster().size(), equalTo(0));
// One remote is skipped, one is not
telemetry = getTelemetryFromFailedQuery("from logs-*,c*:no_such_index | stats sum (v)");
assertThat(telemetry.getTotalCount(), equalTo(1L));
assertThat(telemetry.getSuccessCount(), equalTo(0L));
assertThat(telemetry.getByRemoteCluster().size(), equalTo(1));
assertThat(telemetry.getRemotesPerSearchAvg(), equalTo(2.0));
assertThat(telemetry.getRemotesPerSearchMax(), equalTo(2L));
assertThat(telemetry.getSearchCountWithSkippedRemotes(), equalTo(1L));
Map<String, Long> expectedFailure = Map.of(CCSUsageTelemetry.Result.NOT_FOUND.getName(), 1L);
assertThat(telemetry.getFailureReasons(), equalTo(expectedFailure));
// cluster-b should be skipped
assertThat(telemetry.getByRemoteCluster().get(REMOTE2).getCount(), equalTo(0L));
assertThat(telemetry.getByRemoteCluster().get(REMOTE2).getSkippedCount(), equalTo(1L));
// this is only for cluster-a so no skipped remotes
telemetry = getTelemetryFromFailedQuery("from logs-*,cluster-a:no_such_index | stats sum (v)");
assertThat(telemetry.getTotalCount(), equalTo(2L));
assertThat(telemetry.getSuccessCount(), equalTo(0L));
assertThat(telemetry.getByRemoteCluster().size(), equalTo(1));
assertThat(telemetry.getRemotesPerSearchAvg(), equalTo(2.0));
assertThat(telemetry.getRemotesPerSearchMax(), equalTo(2L));
assertThat(telemetry.getSearchCountWithSkippedRemotes(), equalTo(1L));
expectedFailure = Map.of(CCSUsageTelemetry.Result.NOT_FOUND.getName(), 2L);
assertThat(telemetry.getFailureReasons(), equalTo(expectedFailure));
assertThat(telemetry.getByRemoteCluster().size(), equalTo(1));
}
// TODO: enable when skip-up patch is merged
// public void testSkipAllRemotes() throws Exception {
// var telemetry = getTelemetryFromQuery("from logs-*,c*:no_such_index | stats sum (v)", "unknown");
//
// assertThat(telemetry.getTotalCount(), equalTo(1L));
// assertThat(telemetry.getSuccessCount(), equalTo(1L));
// assertThat(telemetry.getFailureReasons().size(), equalTo(0));
// assertThat(telemetry.getTook().count(), equalTo(1L));
// assertThat(telemetry.getTookMrtFalse().count(), equalTo(0L));
// assertThat(telemetry.getTookMrtTrue().count(), equalTo(0L));
// assertThat(telemetry.getRemotesPerSearchAvg(), equalTo(2.0));
// assertThat(telemetry.getRemotesPerSearchMax(), equalTo(2L));
// assertThat(telemetry.getSearchCountWithSkippedRemotes(), equalTo(1L));
// assertThat(telemetry.getClientCounts().size(), equalTo(0));
//
// var perCluster = telemetry.getByRemoteCluster();
// assertThat(perCluster.size(), equalTo(3));
// for (String clusterAlias : remoteClusterAlias()) {
// var clusterData = perCluster.get(clusterAlias);
// assertThat(clusterData.getCount(), equalTo(0L));
// assertThat(clusterData.getSkippedCount(), equalTo(1L));
// assertThat(clusterData.getTook().count(), equalTo(0L));
// }
// assertPerClusterCount(perCluster.get(LOCAL_CLUSTER), 1L);
// }
public void testRemoteOnly() throws Exception {
setupClusters();
var telemetry = getTelemetryFromQuery("from c*:logs-* | stats sum (v)", "kibana");
assertThat(telemetry.getTotalCount(), equalTo(1L));
assertThat(telemetry.getSuccessCount(), equalTo(1L));
assertThat(telemetry.getFailureReasons().size(), equalTo(0));
assertThat(telemetry.getTook().count(), equalTo(1L));
assertThat(telemetry.getTookMrtFalse().count(), equalTo(0L));
assertThat(telemetry.getTookMrtTrue().count(), equalTo(0L));
assertThat(telemetry.getRemotesPerSearchAvg(), equalTo(2.0));
assertThat(telemetry.getRemotesPerSearchMax(), equalTo(2L));
assertThat(telemetry.getSearchCountWithSkippedRemotes(), equalTo(0L));
assertThat(telemetry.getClientCounts().size(), equalTo(1));
assertThat(telemetry.getClientCounts().get("kibana"), equalTo(1L));
assertThat(telemetry.getFeatureCounts().get(ASYNC_FEATURE), equalTo(null));
var perCluster = telemetry.getByRemoteCluster();
assertThat(perCluster.size(), equalTo(2));
for (String clusterAlias : remoteClusterAlias()) {
assertPerClusterCount(perCluster.get(clusterAlias), 1L);
}
assertThat(telemetry.getByRemoteCluster().size(), equalTo(2));
}
public void testAsync() throws Exception {
setupClusters();
var telemetry = getTelemetryFromAsyncQuery("from logs-*,c*:logs-* | stats sum (v)");
assertThat(telemetry.getTotalCount(), equalTo(1L));
assertThat(telemetry.getSuccessCount(), equalTo(1L));
assertThat(telemetry.getFailureReasons().size(), equalTo(0));
assertThat(telemetry.getTook().count(), equalTo(1L));
assertThat(telemetry.getTookMrtFalse().count(), equalTo(0L));
assertThat(telemetry.getTookMrtTrue().count(), equalTo(0L));
assertThat(telemetry.getRemotesPerSearchAvg(), equalTo(2.0));
assertThat(telemetry.getRemotesPerSearchMax(), equalTo(2L));
assertThat(telemetry.getSearchCountWithSkippedRemotes(), equalTo(0L));
assertThat(telemetry.getClientCounts().size(), equalTo(0));
assertThat(telemetry.getFeatureCounts().get(ASYNC_FEATURE), equalTo(1L));
var perCluster = telemetry.getByRemoteCluster();
assertThat(perCluster.size(), equalTo(3));
for (String clusterAlias : remoteClusterAlias()) {
assertPerClusterCount(perCluster.get(clusterAlias), 1L);
}
assertPerClusterCount(perCluster.get(LOCAL_CLUSTER), 1L);
// do it again
telemetry = getTelemetryFromAsyncQuery("from logs-*,c*:logs-* | stats sum (v)");
assertThat(telemetry.getTotalCount(), equalTo(2L));
assertThat(telemetry.getFeatureCounts().get(ASYNC_FEATURE), equalTo(2L));
perCluster = telemetry.getByRemoteCluster();
assertThat(perCluster.size(), equalTo(3));
for (String clusterAlias : remoteClusterAlias()) {
assertPerClusterCount(perCluster.get(clusterAlias), 2L);
}
assertPerClusterCount(perCluster.get(LOCAL_CLUSTER), 2L);
}
public void testNoSuchCluster() throws Exception {
setupClusters();
// This is not recognized as a cross-cluster search
var telemetry = getTelemetryFromFailedQuery("from c*:logs*, nocluster:nomatch | stats sum (v)");
assertThat(telemetry.getTotalCount(), equalTo(0L));
assertThat(telemetry.getSuccessCount(), equalTo(0L));
assertThat(telemetry.getByRemoteCluster().size(), equalTo(0));
}
@SkipUnavailableRule.NotSkipped(aliases = REMOTE1)
public void testDisconnect() throws Exception {
setupClusters();
// Disconnect remote1
cluster(REMOTE1).close();
var telemetry = getTelemetryFromFailedQuery("from logs-*,cluster-a:logs-* | stats sum (v)");
assertThat(telemetry.getTotalCount(), equalTo(1L));
assertThat(telemetry.getSuccessCount(), equalTo(0L));
Map<String, Long> expectedFailure = Map.of(CCSUsageTelemetry.Result.REMOTES_UNAVAILABLE.getName(), 1L);
assertThat(telemetry.getFailureReasons(), equalTo(expectedFailure));
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.action;
import org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry;
import org.elasticsearch.plugins.Plugin;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
public class CrossClustersUsageTelemetryNoLicenseIT extends AbstractCrossClustersUsageTelemetryIT {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
plugins.add(EsqlPluginWithNonEnterpriseOrExpiredLicense.class);
plugins.add(CrossClustersQueryIT.InternalExchangePlugin.class);
return plugins;
}
public void testLicenseFailure() throws Exception {
setupClusters();
var telemetry = getTelemetryFromFailedQuery("from logs-*,c*:logs-* | stats sum (v)");
assertThat(telemetry.getTotalCount(), equalTo(1L));
assertThat(telemetry.getSuccessCount(), equalTo(0L));
assertThat(telemetry.getTook().count(), equalTo(0L));
assertThat(telemetry.getRemotesPerSearchAvg(), equalTo(2.0));
assertThat(telemetry.getRemotesPerSearchMax(), equalTo(2L));
Map<String, Long> expectedFailure = Map.of(CCSUsageTelemetry.Result.LICENSE.getName(), 1L);
assertThat(telemetry.getFailureReasons(), equalTo(expectedFailure));
}
}

View File

@ -206,6 +206,10 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
return clusterInfo.get(clusterAlias);
}
public Map<String, Cluster> getClusters() {
return clusterInfo;
}
/**
* Utility to swap a Cluster object. Guidelines for the remapping function:
* <ul>

View File

@ -80,7 +80,8 @@ public class PlanExecutor {
);
QueryMetric clientId = QueryMetric.fromString("rest");
metrics.total(clientId);
session.execute(request, executionInfo, planRunner, wrap(x -> {
ActionListener<Result> executeListener = wrap(x -> {
planningMetricsManager.publish(planningMetrics, true);
listener.onResponse(x);
}, ex -> {
@ -88,7 +89,10 @@ public class PlanExecutor {
metrics.failed(clientId);
planningMetricsManager.publish(planningMetrics, false);
listener.onFailure(ex);
}));
});
// Wrap it in a listener so that if we have any exceptions during execution, the listener picks it up
// and all the metrics are properly updated
ActionListener.run(executeListener, l -> session.execute(request, executionInfo, planRunner, l));
}
public IndexResolver indexResolver() {

View File

@ -9,6 +9,8 @@ package org.elasticsearch.xpack.esql.plugin;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.cluster.stats.CCSUsage;
import org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.internal.Client;
@ -20,16 +22,20 @@ import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.usage.UsageService;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.action.ColumnInfoImpl;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
@ -52,6 +58,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
@ -71,6 +78,7 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
private final AsyncTaskManagementService<EsqlQueryRequest, EsqlQueryResponse, EsqlQueryTask> asyncTaskManagementService;
private final RemoteClusterService remoteClusterService;
private final QueryBuilderResolver queryBuilderResolver;
private final UsageService usageService;
@Inject
@SuppressWarnings("this-escape")
@ -86,8 +94,8 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
BlockFactory blockFactory,
Client client,
NamedWriteableRegistry registry,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
UsageService usageService
) {
// TODO replace SAME when removing workaround for https://github.com/elastic/elasticsearch/issues/97916
super(EsqlQueryAction.NAME, transportService, actionFilters, EsqlQueryRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
@ -126,6 +134,7 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
);
this.remoteClusterService = transportService.getRemoteClusterService();
this.queryBuilderResolver = new QueryBuilderResolver(searchService, clusterService, transportService, indexNameExpressionResolver);
this.usageService = usageService;
}
@Override
@ -197,8 +206,65 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
remoteClusterService,
planRunner,
queryBuilderResolver,
listener.map(result -> toResponse(task, request, configuration, result))
ActionListener.wrap(result -> {
recordCCSTelemetry(task, executionInfo, request, null);
listener.onResponse(toResponse(task, request, configuration, result));
}, ex -> {
recordCCSTelemetry(task, executionInfo, request, ex);
listener.onFailure(ex);
})
);
}
private void recordCCSTelemetry(Task task, EsqlExecutionInfo executionInfo, EsqlQueryRequest request, @Nullable Exception exception) {
if (executionInfo.isCrossClusterSearch() == false) {
return;
}
CCSUsage.Builder usageBuilder = new CCSUsage.Builder();
usageBuilder.setClientFromTask(task);
if (exception != null) {
if (exception instanceof VerificationException ve) {
CCSUsageTelemetry.Result failureType = classifyVerificationException(ve);
if (failureType != CCSUsageTelemetry.Result.UNKNOWN) {
usageBuilder.setFailure(failureType);
} else {
usageBuilder.setFailure(exception);
}
} else {
usageBuilder.setFailure(exception);
}
}
var took = executionInfo.overallTook();
if (took != null) {
usageBuilder.took(took.getMillis());
}
if (request.async()) {
usageBuilder.setFeature(CCSUsageTelemetry.ASYNC_FEATURE);
}
AtomicInteger remotesCount = new AtomicInteger();
executionInfo.getClusters().forEach((clusterAlias, cluster) -> {
if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) {
usageBuilder.skippedRemote(clusterAlias);
} else {
usageBuilder.perClusterUsage(clusterAlias, cluster.getTook());
}
if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) {
remotesCount.getAndIncrement();
}
});
assert remotesCount.get() > 0 : "Got cross-cluster search telemetry without any remote clusters";
usageBuilder.setRemotesCount(remotesCount.get());
usageService.getEsqlUsageHolder().updateUsage(usageBuilder.build());
}
private CCSUsageTelemetry.Result classifyVerificationException(VerificationException exception) {
if (exception.getDetailedMessage().contains("Unknown index")) {
return CCSUsageTelemetry.Result.NOT_FOUND;
}
return CCSUsageTelemetry.Result.UNKNOWN;
}
private EsqlExecutionInfo getOrCreateExecutionInfo(Task task, EsqlQueryRequest request) {

View File

@ -312,7 +312,7 @@ public class EsqlSession {
.collect(Collectors.toSet());
final List<TableInfo> indices = preAnalysis.indices;
EsqlSessionCCSUtils.checkForCcsLicense(indices, indicesExpressionGrouper, verifier.licenseState());
EsqlSessionCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, verifier.licenseState());
final Set<String> targetClusters = enrichPolicyResolver.groupIndicesPerCluster(
indices.stream().flatMap(t -> Arrays.stream(Strings.commaDelimitedListToStringArray(t.id().index()))).toArray(String[]::new)

View File

@ -308,6 +308,7 @@ class EsqlSessionCCSUtils {
* @throws org.elasticsearch.ElasticsearchStatusException if the license is not valid (or present) for ES|QL CCS search.
*/
public static void checkForCcsLicense(
EsqlExecutionInfo executionInfo,
List<TableInfo> indices,
IndicesExpressionGrouper indicesGrouper,
XPackLicenseState licenseState
@ -326,6 +327,17 @@ class EsqlSessionCCSUtils {
// check if it is a cross-cluster query
if (groupedIndices.size() > 1 || groupedIndices.containsKey(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY) == false) {
if (EsqlLicenseChecker.isCcsAllowed(licenseState) == false) {
// initialize the cluster entries in EsqlExecutionInfo before throwing the invalid license error
// so that the CCS telemetry handler can recognize that this error is CCS-related
for (Map.Entry<String, OriginalIndices> entry : groupedIndices.entrySet()) {
executionInfo.swapCluster(
entry.getKey(),
(k, v) -> new EsqlExecutionInfo.Cluster(
entry.getKey(),
Strings.arrayToCommaDelimitedString(entry.getValue().indices())
)
);
}
throw EsqlLicenseChecker.invalidLicenseForCcsException(licenseState);
}
}

View File

@ -644,6 +644,7 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
public void testCheckForCcsLicense() {
final TestIndicesExpressionGrouper indicesGrouper = new TestIndicesExpressionGrouper();
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
// this seems to be used only for tracking usage of features, not for checking if a license is expired
final LongSupplier currTime = () -> System.currentTimeMillis();
@ -671,22 +672,22 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
List<TableInfo> indices = new ArrayList<>();
indices.add(new TableInfo(new TableIdentifier(EMPTY, null, randomFrom("idx", "idx1,idx2*"))));
checkForCcsLicense(indices, indicesGrouper, enterpriseLicenseValid);
checkForCcsLicense(indices, indicesGrouper, platinumLicenseValid);
checkForCcsLicense(indices, indicesGrouper, goldLicenseValid);
checkForCcsLicense(indices, indicesGrouper, trialLicenseValid);
checkForCcsLicense(indices, indicesGrouper, basicLicenseValid);
checkForCcsLicense(indices, indicesGrouper, standardLicenseValid);
checkForCcsLicense(indices, indicesGrouper, missingLicense);
checkForCcsLicense(indices, indicesGrouper, nullLicense);
checkForCcsLicense(executionInfo, indices, indicesGrouper, enterpriseLicenseValid);
checkForCcsLicense(executionInfo, indices, indicesGrouper, platinumLicenseValid);
checkForCcsLicense(executionInfo, indices, indicesGrouper, goldLicenseValid);
checkForCcsLicense(executionInfo, indices, indicesGrouper, trialLicenseValid);
checkForCcsLicense(executionInfo, indices, indicesGrouper, basicLicenseValid);
checkForCcsLicense(executionInfo, indices, indicesGrouper, standardLicenseValid);
checkForCcsLicense(executionInfo, indices, indicesGrouper, missingLicense);
checkForCcsLicense(executionInfo, indices, indicesGrouper, nullLicense);
checkForCcsLicense(indices, indicesGrouper, enterpriseLicenseInactive);
checkForCcsLicense(indices, indicesGrouper, platinumLicenseInactive);
checkForCcsLicense(indices, indicesGrouper, goldLicenseInactive);
checkForCcsLicense(indices, indicesGrouper, trialLicenseInactive);
checkForCcsLicense(indices, indicesGrouper, basicLicenseInactive);
checkForCcsLicense(indices, indicesGrouper, standardLicenseInactive);
checkForCcsLicense(indices, indicesGrouper, missingLicenseInactive);
checkForCcsLicense(executionInfo, indices, indicesGrouper, enterpriseLicenseInactive);
checkForCcsLicense(executionInfo, indices, indicesGrouper, platinumLicenseInactive);
checkForCcsLicense(executionInfo, indices, indicesGrouper, goldLicenseInactive);
checkForCcsLicense(executionInfo, indices, indicesGrouper, trialLicenseInactive);
checkForCcsLicense(executionInfo, indices, indicesGrouper, basicLicenseInactive);
checkForCcsLicense(executionInfo, indices, indicesGrouper, standardLicenseInactive);
checkForCcsLicense(executionInfo, indices, indicesGrouper, missingLicenseInactive);
}
// cross-cluster search requires a valid (active, non-expired) enterprise license OR a valid trial license
@ -701,8 +702,8 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
}
// licenses that work
checkForCcsLicense(indices, indicesGrouper, enterpriseLicenseValid);
checkForCcsLicense(indices, indicesGrouper, trialLicenseValid);
checkForCcsLicense(executionInfo, indices, indicesGrouper, enterpriseLicenseValid);
checkForCcsLicense(executionInfo, indices, indicesGrouper, trialLicenseValid);
// all others fail ---
@ -739,9 +740,10 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
XPackLicenseState licenseState,
String expectedErrorMessageSuffix
) {
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
ElasticsearchStatusException e = expectThrows(
ElasticsearchStatusException.class,
() -> checkForCcsLicense(indices, indicesGrouper, licenseState)
() -> checkForCcsLicense(executionInfo, indices, indicesGrouper, licenseState)
);
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(