From e27a50dead323268e3392649ed43d61acf0c293c Mon Sep 17 00:00:00 2001 From: Niels Bauman <33722607+nielsbauman@users.noreply.github.com> Date: Tue, 4 Feb 2025 09:30:44 +1000 Subject: [PATCH] Run `TransportEnrichStatsAction` on local node (#121256) This action solely needs the cluster state, it can run on any node. Additionally, it needs to be cancellable to avoid doing unnecessary work after a client failure or timeout. --- docs/changelog/121256.yaml | 5 + .../rest-api-spec/api/enrich.stats.json | 2 +- .../core/enrich/action/EnrichStatsAction.java | 40 ++++--- .../EnrichRestActionCancellationIT.java | 5 + .../action/TransportEnrichStatsAction.java | 36 ++++--- .../enrich/rest/RestEnrichStatsAction.java | 7 +- .../action/EnrichStatsResponseTests.java | 101 ------------------ .../enrich/EnrichStatsCollectorTests.java | 35 +++++- .../enrich/ExecutingPolicyDocTests.java | 2 +- 9 files changed, 103 insertions(+), 130 deletions(-) create mode 100644 docs/changelog/121256.yaml delete mode 100644 x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichStatsResponseTests.java diff --git a/docs/changelog/121256.yaml b/docs/changelog/121256.yaml new file mode 100644 index 000000000000..b4ba7fb3d014 --- /dev/null +++ b/docs/changelog/121256.yaml @@ -0,0 +1,5 @@ +pr: 121256 +summary: Run `TransportEnrichStatsAction` on local node +area: Ingest Node +type: enhancement +issues: [] diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.stats.json b/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.stats.json index afd314a0dc80..835fa3f6ffef 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.stats.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/enrich.stats.json @@ -20,7 +20,7 @@ "params": { "master_timeout":{ "type":"time", - "description":"Timeout for processing on master node" + "description":"Timeout for waiting for new cluster state in case it is blocked" } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/EnrichStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/EnrichStatsAction.java index 36322ed6c6cb..4de62d0fd499 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/EnrichStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/EnrichStatsAction.java @@ -10,12 +10,16 @@ import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; -import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.action.support.local.LocalClusterStateRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.UpdateForV10; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.xcontent.ToXContentFragment; import org.elasticsearch.xcontent.ToXContentObject; @@ -23,6 +27,7 @@ import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.Objects; public class EnrichStatsAction extends ActionType { @@ -34,12 +39,17 @@ public class EnrichStatsAction extends ActionType { super(NAME); } - public static class Request extends MasterNodeRequest { + public static class Request extends LocalClusterStateRequest { public Request(TimeValue masterNodeTimeout) { super(masterNodeTimeout); } + /** + * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until + * we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) public Request(StreamInput in) throws IOException { super(in); } @@ -48,6 +58,11 @@ public class EnrichStatsAction extends ActionType { public ActionRequestValidationException validate() { return null; } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "", parentTaskId, headers); + } } public static class Response extends ActionResponse implements ToXContentObject { @@ -62,13 +77,6 @@ public class EnrichStatsAction extends ActionType { this.cacheStats = cacheStats; } - public Response(StreamInput in) throws IOException { - super(in); - executingPolicies = in.readCollectionAsList(ExecutingPolicy::new); - coordinatorStats = in.readCollectionAsList(CoordinatorStats::new); - cacheStats = in.readCollectionAsList(CacheStats::new); - } - public List getExecutingPolicies() { return executingPolicies; } @@ -81,6 +89,11 @@ public class EnrichStatsAction extends ActionType { return cacheStats; } + /** + * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until + * we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) @Override public void writeTo(StreamOutput out) throws IOException { out.writeCollection(executingPolicies); @@ -167,10 +180,11 @@ public class EnrichStatsAction extends ActionType { public record ExecutingPolicy(String name, TaskInfo taskInfo) implements Writeable, ToXContentFragment { - ExecutingPolicy(StreamInput in) throws IOException { - this(in.readString(), TaskInfo.from(in)); - } - + /** + * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until + * we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(name); diff --git a/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichRestActionCancellationIT.java b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichRestActionCancellationIT.java index a75dd26eacee..29808e4a9136 100644 --- a/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichRestActionCancellationIT.java +++ b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichRestActionCancellationIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.rest.root.MainRestPlugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.rest.ObjectPath; import org.elasticsearch.transport.netty4.Netty4Plugin; +import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction; import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction; import java.io.IOException; @@ -62,6 +63,10 @@ public class EnrichRestActionCancellationIT extends ESIntegTestCase { runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_enrich/policy"), GetEnrichPolicyAction.NAME); } + public void testEnrichStatsCancellation() throws IOException { + runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_enrich/_stats"), EnrichStatsAction.NAME); + } + private void runRestActionCancellationTest(Request request, String actionName) { final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportEnrichStatsAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportEnrichStatsAction.java index b78aaa28428c..380b64ea365f 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportEnrichStatsAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportEnrichStatsAction.java @@ -9,17 +9,18 @@ package org.elasticsearch.xpack.enrich.action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.action.support.ChannelActionListener; +import org.elasticsearch.action.support.local.TransportLocalClusterStateAction; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.UpdateForV10; import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction; import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats; @@ -31,34 +32,44 @@ import java.util.List; import java.util.Objects; import java.util.stream.Collectors; -public class TransportEnrichStatsAction extends TransportMasterNodeAction { +public class TransportEnrichStatsAction extends TransportLocalClusterStateAction { private final Client client; + /** + * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until + * we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) + @SuppressWarnings("this-escape") @Inject public TransportEnrichStatsAction( TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, Client client ) { super( EnrichStatsAction.NAME, - transportService, - clusterService, - threadPool, actionFilters, - EnrichStatsAction.Request::new, - EnrichStatsAction.Response::new, + transportService.getTaskManager(), + clusterService, EsExecutors.DIRECT_EXECUTOR_SERVICE ); this.client = client; + + transportService.registerRequestHandler( + actionName, + executor, + false, + true, + EnrichStatsAction.Request::new, + (request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel)) + ); } @Override - protected void masterOperation( + protected void localClusterStateOperation( Task task, EnrichStatsAction.Request request, ClusterState state, @@ -101,6 +112,7 @@ public class TransportEnrichStatsAction extends TransportMasterNodeAction client.execute(EnrichStatsAction.INSTANCE, request, new RestToXContentListener<>(channel)); + return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute( + EnrichStatsAction.INSTANCE, + request, + new RestToXContentListener<>(channel) + ); } } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichStatsResponseTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichStatsResponseTests.java deleted file mode 100644 index aec184472d41..000000000000 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichStatsResponseTests.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ -package org.elasticsearch.xpack.enrich.action; - -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.tasks.TaskInfo; -import org.elasticsearch.test.AbstractWireSerializingTestCase; -import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction; -import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CacheStats; -import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats; -import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.ExecutingPolicy; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -public class EnrichStatsResponseTests extends AbstractWireSerializingTestCase { - - @Override - protected EnrichStatsAction.Response createTestInstance() { - int numExecutingPolicies = randomIntBetween(0, 16); - List executingPolicies = new ArrayList<>(numExecutingPolicies); - for (int i = 0; i < numExecutingPolicies; i++) { - TaskInfo taskInfo = randomTaskInfo(); - executingPolicies.add(new ExecutingPolicy(randomAlphaOfLength(4), taskInfo)); - } - int numCoordinatingStats = randomIntBetween(0, 16); - List coordinatorStats = new ArrayList<>(numCoordinatingStats); - List cacheStats = new ArrayList<>(numCoordinatingStats); - for (int i = 0; i < numCoordinatingStats; i++) { - String nodeId = randomAlphaOfLength(4); - CoordinatorStats stats = new CoordinatorStats( - nodeId, - randomIntBetween(0, 8096), - randomIntBetween(0, 8096), - randomNonNegativeLong(), - randomNonNegativeLong() - ); - coordinatorStats.add(stats); - cacheStats.add( - new CacheStats( - nodeId, - randomNonNegativeLong(), - randomNonNegativeLong(), - randomNonNegativeLong(), - randomNonNegativeLong(), - randomNonNegativeLong(), - randomNonNegativeLong(), - randomNonNegativeLong() - ) - ); - } - return new EnrichStatsAction.Response(executingPolicies, coordinatorStats, cacheStats); - } - - @Override - protected EnrichStatsAction.Response mutateInstance(EnrichStatsAction.Response instance) { - return null;// TODO implement https://github.com/elastic/elasticsearch/issues/25929 - } - - @Override - protected Writeable.Reader instanceReader() { - return EnrichStatsAction.Response::new; - } - - public static TaskInfo randomTaskInfo() { - String nodeId = randomAlphaOfLength(5); - TaskId taskId = new TaskId(nodeId, randomLong()); - String type = randomAlphaOfLength(5); - String action = randomAlphaOfLength(5); - String description = randomAlphaOfLength(5); - long startTime = randomLong(); - long runningTimeNanos = randomNonNegativeLong(); - boolean cancellable = randomBoolean(); - boolean cancelled = cancellable && randomBoolean(); - TaskId parentTaskId = TaskId.EMPTY_TASK_ID; - Map headers = randomBoolean() - ? Collections.emptyMap() - : Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5)); - return new TaskInfo( - taskId, - type, - nodeId, - action, - description, - null, - startTime, - runningTimeNanos, - cancellable, - cancelled, - parentTaskId, - headers - ); - } -} diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/EnrichStatsCollectorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/EnrichStatsCollectorTests.java index 2a069eb59676..b7537089413d 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/EnrichStatsCollectorTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/EnrichStatsCollectorTests.java @@ -13,6 +13,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.TimeValue; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction; import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats; import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.ExecutingPolicy; @@ -21,9 +23,10 @@ import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; import org.elasticsearch.xpack.monitoring.BaseCollectorTestCase; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; -import static org.elasticsearch.xpack.enrich.action.EnrichStatsResponseTests.randomTaskInfo; import static org.elasticsearch.xpack.monitoring.MonitoringTestUtils.randomMonitoringNode; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -154,4 +157,34 @@ public class EnrichStatsCollectorTests extends BaseCollectorTestCase { return new EnrichStatsCollector(clusterService, licenseState, client); } + public static TaskInfo randomTaskInfo() { + String nodeId = randomAlphaOfLength(5); + TaskId taskId = new TaskId(nodeId, randomLong()); + String type = randomAlphaOfLength(5); + String action = randomAlphaOfLength(5); + String description = randomAlphaOfLength(5); + long startTime = randomLong(); + long runningTimeNanos = randomNonNegativeLong(); + boolean cancellable = randomBoolean(); + boolean cancelled = cancellable && randomBoolean(); + TaskId parentTaskId = TaskId.EMPTY_TASK_ID; + Map headers = randomBoolean() + ? Collections.emptyMap() + : Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5)); + return new TaskInfo( + taskId, + type, + nodeId, + action, + description, + null, + startTime, + runningTimeNanos, + cancellable, + cancelled, + parentTaskId, + headers + ); + } + } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/ExecutingPolicyDocTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/ExecutingPolicyDocTests.java index 3352e6e2bb8a..72dccf77f391 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/ExecutingPolicyDocTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/ExecutingPolicyDocTests.java @@ -23,8 +23,8 @@ import java.util.Map; import java.util.Optional; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; -import static org.elasticsearch.xpack.enrich.action.EnrichStatsResponseTests.randomTaskInfo; import static org.elasticsearch.xpack.monitoring.collector.enrich.EnrichCoordinatorDocTests.DATE_TIME_FORMATTER; +import static org.elasticsearch.xpack.monitoring.collector.enrich.EnrichStatsCollectorTests.randomTaskInfo; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is;