Add IndexingPressureMonitor to monitor large indexing operations (#126372)

Relates ES-11063
This commit is contained in:
Francisco Fernández Castaño 2025-04-15 13:23:18 +02:00 committed by GitHub
parent 0033de9ab3
commit 39670d9477
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 78 additions and 1 deletions

View File

@ -0,0 +1,5 @@
pr: 126372
summary: Add `IndexingPressureMonitor` to monitor large indexing operations
area: CRUD
type: enhancement
issues: []

View File

@ -19,11 +19,13 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.index.stats.IndexingPressureStats;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
public class IndexingPressure {
public class IndexingPressure implements IndexingPressureMonitor {
public static final Setting<ByteSizeValue> MAX_INDEXING_BYTES = Setting.memorySizeSetting(
"indexing_pressure.memory.limit",
@ -127,6 +129,8 @@ public class IndexingPressure {
private final long replicaLimit;
private final long operationLimit;
private final List<IndexingPressureListener> listeners = new CopyOnWriteArrayList<>();
public IndexingPressure(Settings settings) {
this.lowWatermark = SPLIT_BULK_LOW_WATERMARK.get(settings).getBytes();
this.lowWatermarkSize = SPLIT_BULK_LOW_WATERMARK_SIZE.get(settings).getBytes();
@ -339,12 +343,14 @@ public class IndexingPressure {
long largestOperationSizeInBytes,
boolean allowsOperationsBeyondSizeLimit
) {
listeners.forEach(l -> l.onPrimaryOperationTracked(largestOperationSizeInBytes));
if (largestOperationSizeInBytes > operationLimit) {
this.largeOpsRejections.getAndIncrement();
this.totalRejectedLargeOpsBytes.addAndGet(largestOperationSizeInBytes);
if (allowsOperationsBeyondSizeLimit == false) {
this.primaryRejections.getAndIncrement();
this.primaryDocumentRejections.addAndGet(operations);
listeners.forEach(l -> l.onLargeIndexingOperationRejection(largestOperationSizeInBytes));
throw new EsRejectedExecutionException(
"Request contains an operation of size ["
+ largestOperationSizeInBytes
@ -489,4 +495,14 @@ public class IndexingPressure {
totalRejectedLargeOpsBytes.get()
);
}
@Override
public long getMaxAllowedOperationSizeInBytes() {
return operationLimit;
}
@Override
public void addListener(IndexingPressureListener listener) {
listeners.add(listener);
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.index;
/**
* Monitors indexing pressure events within the system and tracks operation sizes.
* This interface provides mechanisms to check maximum allowed operation sizes
* and register listeners for indexing pressure events.
*/
public interface IndexingPressureMonitor {
/**
* Returns the maximum allowed size in bytes for any single indexing operation.
* Operations exceeding this limit may be rejected.
*
* @return the maximum allowed operation size in bytes
*/
long getMaxAllowedOperationSizeInBytes();
/**
* Registers a listener to be notified of indexing pressure events.
* The listener will receive callbacks when operations are tracked or rejected.
*
* @param listener the listener to register for indexing pressure events
*/
void addListener(IndexingPressureListener listener);
/**
* Listener interface for receiving notifications about indexing pressure events.
* Implementations can respond to tracking of primary operations and rejections
* of large indexing operations.
*/
interface IndexingPressureListener {
/**
* Called when a primary indexing operation is tracked.
* The implementation should be really lightweight as this is called in a hot path.
*
* @param largestOperationSizeInBytes the size in bytes of the largest operation tracked
*/
void onPrimaryOperationTracked(long largestOperationSizeInBytes);
/**
* Called when a large indexing operation is rejected due to exceeding size limits.
* The implementation should be really lightweight as this is called in a hot path.
*
* @param largestOperationSizeInBytes the size in bytes of the rejected operation
*/
void onLargeIndexingOperationRejection(long largestOperationSizeInBytes);
}
}