Remove some overhead from TransportService message handling (#124428)

Avoiding some indirection, volatile-reads and moving the listener
functionality that needlessly kept iterating an empty CoW list (creating
iterator instances, volatile reads, more code) in an effort to improve
the low IPC on transport threads.
This commit is contained in:
Armin Braun 2025-03-09 16:00:11 +01:00 committed by GitHub
parent bab03b3e35
commit 425823cb5c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 134 additions and 83 deletions

View File

@ -14,13 +14,17 @@ import org.apache.http.nio.entity.NByteArrayEntity;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.ErrorTraceHelper;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.xcontent.XContentType;
import org.junit.Before;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Collection;
import java.util.function.BooleanSupplier;
import static org.elasticsearch.index.query.QueryBuilders.simpleQueryStringQuery;
@ -28,6 +32,11 @@ import static org.elasticsearch.index.query.QueryBuilders.simpleQueryStringQuery
public class SearchErrorTraceIT extends HttpSmokeTestCase {
private BooleanSupplier hasStackTrace;
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopyNoNullElements(super.nodePlugins(), MockTransportService.TestPlugin.class);
}
@Before
public void setupMessageListener() {
hasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster());

View File

@ -66,7 +66,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.io.UncheckedIOException;
@ -1052,8 +1051,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
final AtomicBoolean blocked = new AtomicBoolean(true);
final TransportService transportService = internalCluster().getInstance(TransportService.class, otherDataNode);
transportService.addMessageListener(new TransportMessageListener() {
MockTransportService.getInstance(otherDataNode).addMessageListener(new TransportMessageListener() {
@Override
public void onRequestSent(
DiscoveryNode node,

View File

@ -60,11 +60,9 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
@ -103,8 +101,7 @@ public class TransportService extends AbstractLifecycleComponent
Setting.Property.Deprecated
);
private final AtomicBoolean handleIncomingRequests = new AtomicBoolean();
private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener();
private volatile boolean handleIncomingRequests;
protected final Transport transport;
protected final ConnectionManager connectionManager;
protected final ThreadPool threadPool;
@ -134,7 +131,7 @@ public class TransportService extends AbstractLifecycleComponent
// tracer log
private final Logger tracerLog;
private static final Logger tracerLog = Loggers.getLogger(logger, ".tracer");
private final Tracer tracer;
volatile String[] tracerLogInclude;
@ -291,7 +288,6 @@ public class TransportService extends AbstractLifecycleComponent
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
setTracerLogInclude(TransportSettings.TRACE_LOG_INCLUDE_SETTING.get(settings));
setTracerLogExclude(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.get(settings));
tracerLog = Loggers.getLogger(logger, ".tracer");
this.taskManager = taskManger;
this.interceptor = transportInterceptor;
this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);
@ -432,8 +428,8 @@ public class TransportService extends AbstractLifecycleComponent
* reject any incoming requests, including handshakes, by closing the connection.
*/
public final void acceptIncomingRequests() {
final boolean startedWithThisCall = handleIncomingRequests.compareAndSet(false, true);
assert startedWithThisCall : "transport service was already accepting incoming requests";
assert handleIncomingRequests == false : "transport service was already accepting incoming requests";
handleIncomingRequests = true;
logger.debug("now accepting incoming requests");
}
@ -750,14 +746,6 @@ public class TransportService extends AbstractLifecycleComponent
connectionManager.disconnectFromNode(node);
}
public void addMessageListener(TransportMessageListener listener) {
messageListener.listeners.add(listener);
}
public void removeMessageListener(TransportMessageListener listener) {
messageListener.listeners.remove(listener);
}
public void addConnectionListener(TransportConnectionListener listener) {
connectionManager.addListener(listener);
}
@ -1265,13 +1253,12 @@ public class TransportService extends AbstractLifecycleComponent
*/
@Override
public void onRequestReceived(long requestId, String action) {
if (handleIncomingRequests.get() == false) {
if (handleIncomingRequests == false) {
throw new TransportNotReadyException();
}
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
tracerLog.trace("[{}][{}] received request", requestId, action);
}
messageListener.onRequestReceived(requestId, action);
}
/** called by the {@link Transport} implementation once a request has been sent */
@ -1286,7 +1273,6 @@ public class TransportService extends AbstractLifecycleComponent
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
tracerLog.trace("[{}][{}] sent to [{}] (timeout: [{}])", requestId, action, node, options.timeout());
}
messageListener.onRequestSent(node, requestId, action, request, options);
}
@Override
@ -1297,7 +1283,6 @@ public class TransportService extends AbstractLifecycleComponent
} else if (tracerLog.isTraceEnabled() && shouldTraceAction(holder.action())) {
tracerLog.trace("[{}][{}] received response from [{}]", requestId, holder.action(), holder.connection().getNode());
}
messageListener.onResponseReceived(requestId, holder);
}
/** called by the {@link Transport} implementation once a response was sent to calling node */
@ -1306,7 +1291,6 @@ public class TransportService extends AbstractLifecycleComponent
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
tracerLog.trace("[{}][{}] sent response", requestId, action);
}
messageListener.onResponseSent(requestId, action, response);
}
/** called by the {@link Transport} implementation after an exception was sent as a response to an incoming request */
@ -1315,7 +1299,6 @@ public class TransportService extends AbstractLifecycleComponent
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {
tracerLog.trace(() -> format("[%s][%s] sent error response", requestId, action), e);
}
messageListener.onResponseSent(requestId, action, e);
}
public RequestHandlerRegistry<? extends TransportRequest> getRequestHandler(String action) {
@ -1453,6 +1436,7 @@ public class TransportService extends AbstractLifecycleComponent
public void cancel() {
assert responseHandlers.contains(requestId) == false
: "cancel must be called after the requestId [" + requestId + "] has been removed from clientHandlers";
var cancellable = this.cancellable;
if (cancellable != null) {
cancellable.cancel();
}
@ -1492,6 +1476,7 @@ public class TransportService extends AbstractLifecycleComponent
@Override
public void handleResponse(T response) {
var handler = this.handler;
if (handler != null) {
handler.cancel();
}
@ -1502,6 +1487,7 @@ public class TransportService extends AbstractLifecycleComponent
@Override
public void handleException(TransportException exp) {
var handler = this.handler;
if (handler != null) {
handler.cancel();
}
@ -1666,53 +1652,6 @@ public class TransportService extends AbstractLifecycleComponent
return discoveryNode.equals(localNode);
}
private static final class DelegatingTransportMessageListener implements TransportMessageListener {
private final List<TransportMessageListener> listeners = new CopyOnWriteArrayList<>();
@Override
public void onRequestReceived(long requestId, String action) {
for (TransportMessageListener listener : listeners) {
listener.onRequestReceived(requestId, action);
}
}
@Override
public void onResponseSent(long requestId, String action, TransportResponse response) {
for (TransportMessageListener listener : listeners) {
listener.onResponseSent(requestId, action, response);
}
}
@Override
public void onResponseSent(long requestId, String action, Exception error) {
for (TransportMessageListener listener : listeners) {
listener.onResponseSent(requestId, action, error);
}
}
@Override
public void onRequestSent(
DiscoveryNode node,
long requestId,
String action,
TransportRequest request,
TransportRequestOptions finalOptions
) {
for (TransportMessageListener listener : listeners) {
listener.onRequestSent(node, requestId, action, request, finalOptions);
}
}
@Override
@SuppressWarnings("rawtypes")
public void onResponseReceived(long requestId, Transport.ResponseContext holder) {
for (TransportMessageListener listener : listeners) {
listener.onResponseReceived(requestId, holder);
}
}
}
private static class PendingDirectHandlers extends AbstractRefCounted {
// To handle a response we (i) remove the handler from responseHandlers and then (ii) enqueue an action to complete the handler on

View File

@ -11,6 +11,7 @@ package org.elasticsearch.search;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportService;
@ -18,6 +19,8 @@ import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import static org.elasticsearch.test.ESTestCase.asInstanceOf;
/**
* Utilities around testing the `error_trace` message header in search.
*/
@ -26,16 +29,20 @@ public enum ErrorTraceHelper {
public static BooleanSupplier setupErrorTraceListener(InternalTestCluster internalCluster) {
final AtomicBoolean transportMessageHasStackTrace = new AtomicBoolean(false);
internalCluster.getDataNodeInstances(TransportService.class).forEach(ts -> ts.addMessageListener(new TransportMessageListener() {
@Override
public void onResponseSent(long requestId, String action, Exception error) {
TransportMessageListener.super.onResponseSent(requestId, action, error);
if (action.startsWith("indices:data/read/search")) {
Optional<Throwable> throwable = ExceptionsHelper.unwrapCausesAndSuppressed(error, t -> t.getStackTrace().length > 0);
transportMessageHasStackTrace.set(throwable.isPresent());
internalCluster.getDataNodeInstances(TransportService.class)
.forEach(ts -> asInstanceOf(MockTransportService.class, ts).addMessageListener(new TransportMessageListener() {
@Override
public void onResponseSent(long requestId, String action, Exception error) {
TransportMessageListener.super.onResponseSent(requestId, action, error);
if (action.startsWith("indices:data/read/search")) {
Optional<Throwable> throwable = ExceptionsHelper.unwrapCausesAndSuppressed(
error,
t -> t.getStackTrace().length > 0
);
transportMessageHasStackTrace.set(throwable.isPresent());
}
}
}
}));
}));
return transportMessageHasStackTrace::get;
}
}

View File

@ -57,8 +57,10 @@ import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.transport.netty4.Netty4Transport;
@ -106,6 +108,8 @@ public class MockTransportService extends TransportService {
private final List<Runnable> onStopListeners = new CopyOnWriteArrayList<>();
private final AtomicReference<Consumer<Transport.Connection>> onConnectionClosedCallback = new AtomicReference<>();
private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener();
public static class TestPlugin extends Plugin {
@Override
public List<Setting<?>> getSettings() {
@ -814,4 +818,96 @@ public class MockTransportService extends TransportService {
assertTrue(ThreadPool.terminate(testExecutor, 10, TimeUnit.SECONDS));
}
}
@Override
public void onRequestReceived(long requestId, String action) {
super.onRequestReceived(requestId, action);
messageListener.onRequestReceived(requestId, action);
}
@Override
public void onRequestSent(
DiscoveryNode node,
long requestId,
String action,
TransportRequest request,
TransportRequestOptions options
) {
super.onRequestSent(node, requestId, action, request, options);
messageListener.onRequestSent(node, requestId, action, request, options);
}
@Override
@SuppressWarnings("rawtypes")
public void onResponseReceived(long requestId, Transport.ResponseContext holder) {
super.onResponseReceived(requestId, holder);
messageListener.onResponseReceived(requestId, holder);
}
@Override
public void onResponseSent(long requestId, String action, TransportResponse response) {
super.onResponseSent(requestId, action, response);
messageListener.onResponseSent(requestId, action, response);
}
@Override
public void onResponseSent(long requestId, String action, Exception e) {
super.onResponseSent(requestId, action, e);
messageListener.onResponseSent(requestId, action, e);
}
public void addMessageListener(TransportMessageListener listener) {
messageListener.listeners.add(listener);
}
public void removeMessageListener(TransportMessageListener listener) {
messageListener.listeners.remove(listener);
}
private static final class DelegatingTransportMessageListener implements TransportMessageListener {
private final List<TransportMessageListener> listeners = new CopyOnWriteArrayList<>();
@Override
public void onRequestReceived(long requestId, String action) {
for (TransportMessageListener listener : listeners) {
listener.onRequestReceived(requestId, action);
}
}
@Override
public void onResponseSent(long requestId, String action, TransportResponse response) {
for (TransportMessageListener listener : listeners) {
listener.onResponseSent(requestId, action, response);
}
}
@Override
public void onResponseSent(long requestId, String action, Exception error) {
for (TransportMessageListener listener : listeners) {
listener.onResponseSent(requestId, action, error);
}
}
@Override
public void onRequestSent(
DiscoveryNode node,
long requestId,
String action,
TransportRequest request,
TransportRequestOptions finalOptions
) {
for (TransportMessageListener listener : listeners) {
listener.onRequestSent(node, requestId, action, request, finalOptions);
}
}
@Override
@SuppressWarnings("rawtypes")
public void onResponseReceived(long requestId, Transport.ResponseContext holder) {
for (TransportMessageListener listener : listeners) {
listener.onResponseReceived(requestId, holder);
}
}
}
}

View File

@ -9,17 +9,18 @@ package org.elasticsearch.xpack.search;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.ErrorTraceHelper;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.xcontent.XContentType;
import org.junit.Before;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.BooleanSupplier;
@ -31,8 +32,9 @@ public class AsyncSearchErrorTraceIT extends ESIntegTestCase {
}
@Override
@SuppressWarnings("unchecked")
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(AsyncSearch.class);
return CollectionUtils.appendToCopyNoNullElements(super.nodePlugins(), AsyncSearch.class, MockTransportService.TestPlugin.class);
}
private BooleanSupplier transportMessageHasStackTrace;