Run `TransportEnrichStatsAction` on local node (#121256)

This action solely needs the cluster state, it can run on any node.
Additionally, it needs to be cancellable to avoid doing unnecessary work
after a client failure or timeout.
This commit is contained in:
Niels Bauman 2025-02-04 09:30:44 +10:00 committed by GitHub
parent c717e78e0f
commit e27a50dead
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 103 additions and 130 deletions

View File

@ -0,0 +1,5 @@
pr: 121256
summary: Run `TransportEnrichStatsAction` on local node
area: Ingest Node
type: enhancement
issues: []

View File

@ -20,7 +20,7 @@
"params": {
"master_timeout":{
"type":"time",
"description":"Timeout for processing on master node"
"description":"Timeout for waiting for new cluster state in case it is blocked"
}
}
}

View File

@ -10,12 +10,16 @@ import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.ToXContentObject;
@ -23,6 +27,7 @@ import org.elasticsearch.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
@ -34,12 +39,17 @@ public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
super(NAME);
}
public static class Request extends MasterNodeRequest<Request> {
public static class Request extends LocalClusterStateRequest {
public Request(TimeValue masterNodeTimeout) {
super(masterNodeTimeout);
}
/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
public Request(StreamInput in) throws IOException {
super(in);
}
@ -48,6 +58,11 @@ public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
public ActionRequestValidationException validate() {
return null;
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "", parentTaskId, headers);
}
}
public static class Response extends ActionResponse implements ToXContentObject {
@ -62,13 +77,6 @@ public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
this.cacheStats = cacheStats;
}
public Response(StreamInput in) throws IOException {
super(in);
executingPolicies = in.readCollectionAsList(ExecutingPolicy::new);
coordinatorStats = in.readCollectionAsList(CoordinatorStats::new);
cacheStats = in.readCollectionAsList(CacheStats::new);
}
public List<ExecutingPolicy> getExecutingPolicies() {
return executingPolicies;
}
@ -81,6 +89,11 @@ public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
return cacheStats;
}
/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(executingPolicies);
@ -167,10 +180,11 @@ public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
public record ExecutingPolicy(String name, TaskInfo taskInfo) implements Writeable, ToXContentFragment {
ExecutingPolicy(StreamInput in) throws IOException {
this(in.readString(), TaskInfo.from(in));
}
/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);

View File

@ -23,6 +23,7 @@ import org.elasticsearch.rest.root.MainRestPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.rest.ObjectPath;
import org.elasticsearch.transport.netty4.Netty4Plugin;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
import java.io.IOException;
@ -62,6 +63,10 @@ public class EnrichRestActionCancellationIT extends ESIntegTestCase {
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_enrich/policy"), GetEnrichPolicyAction.NAME);
}
public void testEnrichStatsCancellation() throws IOException {
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_enrich/_stats"), EnrichStatsAction.NAME);
}
private void runRestActionCancellationTest(Request request, String actionName) {
final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);

View File

@ -9,17 +9,18 @@ package org.elasticsearch.xpack.enrich.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.local.TransportLocalClusterStateAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats;
@ -31,34 +32,44 @@ import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
public class TransportEnrichStatsAction extends TransportMasterNodeAction<EnrichStatsAction.Request, EnrichStatsAction.Response> {
public class TransportEnrichStatsAction extends TransportLocalClusterStateAction<EnrichStatsAction.Request, EnrichStatsAction.Response> {
private final Client client;
/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@SuppressWarnings("this-escape")
@Inject
public TransportEnrichStatsAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Client client
) {
super(
EnrichStatsAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
EnrichStatsAction.Request::new,
EnrichStatsAction.Response::new,
transportService.getTaskManager(),
clusterService,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.client = client;
transportService.registerRequestHandler(
actionName,
executor,
false,
true,
EnrichStatsAction.Request::new,
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
);
}
@Override
protected void masterOperation(
protected void localClusterStateOperation(
Task task,
EnrichStatsAction.Request request,
ClusterState state,
@ -101,6 +112,7 @@ public class TransportEnrichStatsAction extends TransportMasterNodeAction<Enrich
.collect(Collectors.toList());
delegate.onResponse(new EnrichStatsAction.Response(policyExecutionTasks, coordinatorStats, cacheStats));
});
((CancellableTask) task).ensureNotCancelled();
client.execute(EnrichCoordinatorStatsAction.INSTANCE, statsRequest, statsListener);
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
@ -43,7 +44,11 @@ public class RestEnrichStatsAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) {
final var request = new EnrichStatsAction.Request(RestUtils.getMasterNodeTimeout(restRequest));
return channel -> client.execute(EnrichStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute(
EnrichStatsAction.INSTANCE,
request,
new RestToXContentListener<>(channel)
);
}
}

View File

@ -1,101 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.enrich.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CacheStats;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.ExecutingPolicy;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class EnrichStatsResponseTests extends AbstractWireSerializingTestCase<EnrichStatsAction.Response> {
@Override
protected EnrichStatsAction.Response createTestInstance() {
int numExecutingPolicies = randomIntBetween(0, 16);
List<ExecutingPolicy> executingPolicies = new ArrayList<>(numExecutingPolicies);
for (int i = 0; i < numExecutingPolicies; i++) {
TaskInfo taskInfo = randomTaskInfo();
executingPolicies.add(new ExecutingPolicy(randomAlphaOfLength(4), taskInfo));
}
int numCoordinatingStats = randomIntBetween(0, 16);
List<CoordinatorStats> coordinatorStats = new ArrayList<>(numCoordinatingStats);
List<CacheStats> cacheStats = new ArrayList<>(numCoordinatingStats);
for (int i = 0; i < numCoordinatingStats; i++) {
String nodeId = randomAlphaOfLength(4);
CoordinatorStats stats = new CoordinatorStats(
nodeId,
randomIntBetween(0, 8096),
randomIntBetween(0, 8096),
randomNonNegativeLong(),
randomNonNegativeLong()
);
coordinatorStats.add(stats);
cacheStats.add(
new CacheStats(
nodeId,
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong()
)
);
}
return new EnrichStatsAction.Response(executingPolicies, coordinatorStats, cacheStats);
}
@Override
protected EnrichStatsAction.Response mutateInstance(EnrichStatsAction.Response instance) {
return null;// TODO implement https://github.com/elastic/elasticsearch/issues/25929
}
@Override
protected Writeable.Reader<EnrichStatsAction.Response> instanceReader() {
return EnrichStatsAction.Response::new;
}
public static TaskInfo randomTaskInfo() {
String nodeId = randomAlphaOfLength(5);
TaskId taskId = new TaskId(nodeId, randomLong());
String type = randomAlphaOfLength(5);
String action = randomAlphaOfLength(5);
String description = randomAlphaOfLength(5);
long startTime = randomLong();
long runningTimeNanos = randomNonNegativeLong();
boolean cancellable = randomBoolean();
boolean cancelled = cancellable && randomBoolean();
TaskId parentTaskId = TaskId.EMPTY_TASK_ID;
Map<String, String> headers = randomBoolean()
? Collections.emptyMap()
: Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5));
return new TaskInfo(
taskId,
type,
nodeId,
action,
description,
null,
startTime,
runningTimeNanos,
cancellable,
cancelled,
parentTaskId,
headers
);
}
}

View File

@ -13,6 +13,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.ExecutingPolicy;
@ -21,9 +23,10 @@ import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
import org.elasticsearch.xpack.monitoring.BaseCollectorTestCase;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.xpack.enrich.action.EnrichStatsResponseTests.randomTaskInfo;
import static org.elasticsearch.xpack.monitoring.MonitoringTestUtils.randomMonitoringNode;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
@ -154,4 +157,34 @@ public class EnrichStatsCollectorTests extends BaseCollectorTestCase {
return new EnrichStatsCollector(clusterService, licenseState, client);
}
public static TaskInfo randomTaskInfo() {
String nodeId = randomAlphaOfLength(5);
TaskId taskId = new TaskId(nodeId, randomLong());
String type = randomAlphaOfLength(5);
String action = randomAlphaOfLength(5);
String description = randomAlphaOfLength(5);
long startTime = randomLong();
long runningTimeNanos = randomNonNegativeLong();
boolean cancellable = randomBoolean();
boolean cancelled = cancellable && randomBoolean();
TaskId parentTaskId = TaskId.EMPTY_TASK_ID;
Map<String, String> headers = randomBoolean()
? Collections.emptyMap()
: Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5));
return new TaskInfo(
taskId,
type,
nodeId,
action,
description,
null,
startTime,
runningTimeNanos,
cancellable,
cancelled,
parentTaskId,
headers
);
}
}

View File

@ -23,8 +23,8 @@ import java.util.Map;
import java.util.Optional;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.enrich.action.EnrichStatsResponseTests.randomTaskInfo;
import static org.elasticsearch.xpack.monitoring.collector.enrich.EnrichCoordinatorDocTests.DATE_TIME_FORMATTER;
import static org.elasticsearch.xpack.monitoring.collector.enrich.EnrichStatsCollectorTests.randomTaskInfo;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;