Migrate simple usages of ThreadPool#schedule (#99051)

In #99027 we deprecated the string-based version of
`ThreadPool#schedule`. This commit migrates all the simple usages of
this API to the new version.
This commit is contained in:
David Turner 2023-08-31 07:37:31 +01:00 committed by GitHub
parent 559cabdeef
commit a20ee3f8f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
67 changed files with 180 additions and 165 deletions

View File

@ -338,7 +338,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
private void scheduleNextRun(TimeValue time) {
if (threadPool.scheduler().isShutdown() == false) {
scheduled = threadPool.schedule(this::runDownloader, time, ThreadPool.Names.GENERIC);
scheduled = threadPool.schedule(this::runDownloader, time, threadPool.generic());
}
}

View File

@ -89,6 +89,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -392,7 +393,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
worker.rethrottle(1);
setupClient(new TestThreadPool(getTestName()) {
@Override
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, Executor executor) {
// While we're here we can check that the sleep made it through
assertThat(delay.nanos(), greaterThan(0L));
assertThat(delay.seconds(), lessThanOrEqualTo(10L));
@ -518,7 +519,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
AtomicReference<Runnable> capturedCommand = new AtomicReference<>();
setupClient(new TestThreadPool(getTestName()) {
@Override
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, Executor executor) {
capturedDelay.set(delay);
capturedCommand.set(command);
return new ScheduledCancellable() {
@ -734,7 +735,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
*/
setupClient(new TestThreadPool(getTestName()) {
@Override
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, Executor executor) {
/*
* This is called twice:
* 1. To schedule the throttling. When that happens we immediately cancel the task.
@ -745,7 +746,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
if (delay.nanos() > 0) {
generic().execute(() -> taskManager.cancel(testTask, reason, () -> {}));
}
return super.schedule(command, delay, name);
return super.schedule(command, delay, executor);
}
});

View File

@ -61,6 +61,7 @@ import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
@ -103,7 +104,7 @@ public class RemoteScrollableHitSourceTests extends ESTestCase {
}
@Override
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, Executor name) {
command.run();
return null;
}

View File

@ -43,6 +43,7 @@ import org.elasticsearch.xcontent.NamedXContentRegistry;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@ -193,6 +194,8 @@ class S3Repository extends MeteredBlobStoreRepository {
*/
private final TimeValue coolDown;
private final Executor snapshotExecutor;
/**
* Constructs an s3 backed repository
*/
@ -214,6 +217,7 @@ class S3Repository extends MeteredBlobStoreRepository {
buildLocation(metadata)
);
this.service = service;
this.snapshotExecutor = threadPool().executor(ThreadPool.Names.SNAPSHOT);
// Parse and validate the user's S3 Storage Class setting
this.bucket = BUCKET_SETTING.get(metadata.settings());
@ -331,7 +335,7 @@ class S3Repository extends MeteredBlobStoreRepository {
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
assert cancellable != null;
listener.onRepositoryDataWritten(repositoryData);
}, coolDown, ThreadPool.Names.SNAPSHOT));
}, coolDown, snapshotExecutor));
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}
@ -342,7 +346,7 @@ class S3Repository extends MeteredBlobStoreRepository {
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
assert cancellable != null;
listener.onFailure(e);
}, coolDown, ThreadPool.Names.SNAPSHOT));
}, coolDown, snapshotExecutor));
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}
};
@ -364,11 +368,7 @@ class S3Repository extends MeteredBlobStoreRepository {
public void onResponse(T response) {
logCooldownInfo();
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(
threadPool.schedule(
ActionRunnable.wrap(wrappedListener, l -> l.onResponse(response)),
coolDown,
ThreadPool.Names.SNAPSHOT
)
threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onResponse(response)), coolDown, snapshotExecutor)
);
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}
@ -377,7 +377,7 @@ class S3Repository extends MeteredBlobStoreRepository {
public void onFailure(Exception e) {
logCooldownInfo();
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(
threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onFailure(e)), coolDown, ThreadPool.Names.SNAPSHOT)
threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onFailure(e)), coolDown, snapshotExecutor)
);
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}

View File

@ -74,7 +74,7 @@ public interface NamedGroupExtractor {
interval.millis(),
maxExecutionTime.millis(),
threadPool::relativeTimeInMillis,
(delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC)
(delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), threadPool.generic())
);
})::getOrCompute;
}

View File

@ -21,6 +21,7 @@ import org.junit.Before;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -49,7 +50,7 @@ public class EvilThreadPoolTests extends ESTestCase {
for (String executor : ThreadPool.THREAD_POOL_TYPES.keySet()) {
checkExecutionError(getExecuteRunner(threadPool.executor(executor)));
checkExecutionError(getSubmitRunner(threadPool.executor(executor)));
checkExecutionError(getScheduleRunner(executor));
checkExecutionError(getScheduleRunner(threadPool.executor(executor)));
}
}
@ -158,7 +159,7 @@ public class EvilThreadPoolTests extends ESTestCase {
// here, it's ok for the exception not to bubble up. Accessing the future will yield the exception
checkExecutionException(getSubmitRunner(threadPool.executor(executor)), false);
checkExecutionException(getScheduleRunner(executor), true);
checkExecutionException(getScheduleRunner(threadPool.executor(executor)), true);
}
}
@ -310,7 +311,7 @@ public class EvilThreadPoolTests extends ESTestCase {
};
}
Consumer<Runnable> getScheduleRunner(String executor) {
Consumer<Runnable> getScheduleRunner(Executor executor) {
return new Consumer<Runnable>() {
@Override
public void accept(Runnable runnable) {

View File

@ -538,7 +538,7 @@ public class CancellableTasksIT extends ESIntegTestCase {
}
listener.onResponse(new TestResponse());
}
}, delay, ThreadPool.Names.GENERIC);
}, delay, transportService.getThreadPool().generic());
}
@Override

View File

@ -429,7 +429,7 @@ public class BulkProcessor2 implements Closeable {
}
cancellableFlushTask = null;
}
}, flushInterval, ThreadPool.Names.GENERIC);
}, flushInterval, threadPool.generic());
}
}
}

View File

@ -51,7 +51,7 @@ public class WriteAckDelay implements Consumer<Runnable> {
this.threadPool.scheduleWithFixedDelay(
new ScheduleTask(),
TimeValue.timeValueNanos(writeDelayIntervalNanos),
ThreadPool.Names.GENERIC
this.threadPool.generic()
);
}
@ -80,7 +80,7 @@ public class WriteAckDelay implements Consumer<Runnable> {
writeDelayInterval,
randomDelay
);
threadPool.schedule(new CompletionTask(tasks), randomDelay, ThreadPool.Names.GENERIC);
threadPool.schedule(new CompletionTask(tasks), randomDelay, threadPool.generic());
}
}

View File

@ -1169,7 +1169,7 @@ public class CoordinationDiagnosticsService implements ClusterStateListener {
public String toString() {
return "delayed retrieval of coordination diagnostics info from " + masterEligibleNode;
}
}, remoteRequestInitialDelay, ThreadPool.Names.CLUSTER_COORDINATION);
}, remoteRequestInitialDelay, clusterCoordinationExecutor);
}
void cancelPollingRemoteMasterStabilityDiagnostic() {

View File

@ -1904,7 +1904,7 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt
public String toString() {
return "scheduled timeout for " + CoordinatorPublication.this;
}
}, publishTimeout, Names.CLUSTER_COORDINATION);
}, publishTimeout, clusterCoordinationExecutor);
this.infoTimeoutHandler = transportService.getThreadPool().schedule(new Runnable() {
@Override
@ -1918,7 +1918,7 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt
public String toString() {
return "scheduled timeout for reporting on " + CoordinatorPublication.this;
}
}, publishInfoTimeout, Names.CLUSTER_COORDINATION);
}, publishInfoTimeout, clusterCoordinationExecutor);
}
private void removePublicationAndPossiblyBecomeCandidate(String reason) {

View File

@ -22,6 +22,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.monitor.NodeHealthService;
@ -413,7 +414,7 @@ public class FollowersChecker {
public String toString() {
return FollowerChecker.this + "::handleWakeUp";
}
}, followerCheckInterval, Names.SAME);
}, followerCheckInterval, EsExecutors.DIRECT_EXECUTOR_SERVICE);
}
@Override

View File

@ -375,7 +375,7 @@ public class JoinValidationService {
public String toString() {
return cacheClearer + " after timeout";
}
}, cacheTimeout, ThreadPool.Names.CLUSTER_COORDINATION);
}, cacheTimeout, responseExecutor);
}
} catch (Exception e) {
assert e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown() : e;

View File

@ -19,6 +19,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
@ -392,7 +393,7 @@ public class LeaderChecker {
public String toString() {
return "scheduled check of leader " + leader;
}
}, leaderCheckInterval, Names.SAME);
}, leaderCheckInterval, EsExecutors.DIRECT_EXECUTOR_SERVICE);
}
}

View File

@ -123,7 +123,7 @@ public class StoreHeartbeatService implements LeaderHeartbeatService {
assert 0 < heartbeatTerm : heartbeatTerm;
this.heartbeatTerm = heartbeatTerm;
this.rerunListener = listener.delegateFailureAndWrap(
(l, scheduleDelay) -> threadPool.schedule(HeartbeatTask.this, scheduleDelay, ThreadPool.Names.GENERIC)
(l, scheduleDelay) -> threadPool.schedule(HeartbeatTask.this, scheduleDelay, threadPool.generic())
);
}

View File

@ -21,6 +21,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.Scheduler;
@ -96,7 +97,7 @@ public class DelayedAllocationService extends AbstractLifecycleComponent impleme
logger.warn("failed to submit schedule/execute reroute post unassigned shard", e);
removeIfSameTask(DelayedRerouteTask.this);
}
}, nextDelay, ThreadPool.Names.SAME);
}, nextDelay, EsExecutors.DIRECT_EXECUTOR_SERVICE);
}
@Override

View File

@ -267,7 +267,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
return;
}
if (timeout != null) {
notifyTimeout.cancellable = threadPool.schedule(notifyTimeout, timeout, ThreadPool.Names.GENERIC);
notifyTimeout.cancellable = threadPool.schedule(notifyTimeout, timeout, threadPool.generic());
}
listener.postAdded();
}

View File

@ -714,7 +714,7 @@ public class MasterService extends AbstractLifecycleComponent {
} else if (countDown.countDown()) {
finish();
} else {
this.ackTimeoutCallback = threadPool.schedule(this::onTimeout, timeLeft, ThreadPool.Names.GENERIC);
this.ackTimeoutCallback = threadPool.schedule(this::onTimeout, timeLeft, threadPool.generic());
// re-check if onNodeAck has not completed while we were scheduling the timeout
if (countDown.isCountedDown()) {
ackTimeoutCallback.cancel();
@ -1525,7 +1525,7 @@ public class MasterService extends AbstractLifecycleComponent {
timeoutCancellable = threadPool.schedule(
new TaskTimeoutHandler<>(timeout, source, taskHolder),
timeout,
ThreadPool.Names.GENERIC
threadPool.generic()
);
} else {
timeoutCancellable = null;

View File

@ -188,7 +188,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
runRecovery();
}
}
}, recoverAfterTime, ThreadPool.Names.GENERIC);
}, recoverAfterTime, threadPool.generic());
}
} else {
if (recoveryInProgress.compareAndSet(false, true)) {

View File

@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DelegatingActionListener;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
@ -60,6 +61,6 @@ class RetryListener extends DelegatingActionListener<ScrollableHitSource.Respons
}
private void schedule(Runnable runnable, TimeValue delay) {
threadPool.schedule(runnable, delay, ThreadPool.Names.SAME);
threadPool.schedule(runnable, delay, EsExecutors.DIRECT_EXECUTOR_SERVICE);
}
}

View File

@ -252,7 +252,7 @@ public class WorkerBulkByScrollTaskState implements SuccessfullyProcessed {
this.scheduled = threadPool.schedule(() -> {
throttledNanos.addAndGet(delay.nanos());
command.run();
}, delay, ThreadPool.Names.GENERIC);
}, delay, threadPool.generic());
}
DelayedPrepareBulkRequest rethrottle(float newRequestsPerSecond) {

View File

@ -261,7 +261,7 @@ public class IndicesService extends AbstractLifecycleComponent
@Override
protected void doStart() {
// Start thread that will manage cleaning the field data cache periodically
threadPool.schedule(this.cacheCleaner, this.cleanInterval, ThreadPool.Names.SAME);
threadPool.schedule(this.cacheCleaner, this.cleanInterval, EsExecutors.DIRECT_EXECUTOR_SERVICE);
// Start watching for timestamp fields
clusterService.addStateApplier(timestampFieldMapperService);

View File

@ -82,7 +82,7 @@ public class RecoveriesCollection {
threadPool.schedule(
new RecoveryMonitor(recoveryTarget.recoveryId(), recoveryTarget.lastAccessTime(), activityTimeout),
activityTimeout,
ThreadPool.Names.GENERIC
threadPool.generic()
);
}
@ -321,7 +321,7 @@ public class RecoveriesCollection {
}
lastSeenAccessTime = accessTime;
logger.trace("[monitor] rescheduling check for [{}]. last access time is [{}]", recoveryId, lastSeenAccessTime);
threadPool.schedule(this, checkInterval, ThreadPool.Names.GENERIC);
threadPool.schedule(this, checkInterval, threadPool.generic());
}
}

View File

@ -118,7 +118,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
private volatile ClusterState state;
private static BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> createScheduler(ThreadPool threadPool) {
return (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC);
return (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), threadPool.generic());
}
public static MatcherWatchdog createGrokThreadWatchdog(Environment env, ThreadPool threadPool) {

View File

@ -40,6 +40,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
@ -569,7 +570,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
new ElasticsearchTimeoutException("Wait for seq_no [{}] refreshed timed out [{}]", waitForCheckpoint, timeout)
);
}
}, timeout, Names.SAME);
}, timeout, EsExecutors.DIRECT_EXECUTOR_SERVICE);
// allow waiting for not-yet-issued sequence number if shard isn't promotable to primary and the timeout is less than or equal
// to 30s
@ -588,7 +589,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
shard.addGlobalCheckpointListener(waitForCheckpoint, new GlobalCheckpointListeners.GlobalCheckpointListener() {
@Override
public Executor executor() {
return threadPool.executor(Names.SAME);
return EsExecutors.DIRECT_EXECUTOR_SERVICE;
}
@Override

View File

@ -21,6 +21,7 @@ import org.elasticsearch.client.internal.Requests;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.indices.SystemIndexDescriptor;
@ -103,7 +104,7 @@ public class TaskResultsService {
} else {
TimeValue wait = backoff.next();
logger.warn(() -> "failed to store task result, retrying in [" + wait + "]", e);
threadPool.schedule(() -> doStoreResult(backoff, index, listener), wait, ThreadPool.Names.SAME);
threadPool.schedule(() -> doStoreResult(backoff, index, listener), wait, EsExecutors.DIRECT_EXECUTOR_SERVICE);
}
}
});

View File

@ -459,7 +459,7 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
@Override
@SuppressWarnings("removal")
@Deprecated(forRemoval = true)
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor) {
public final ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor) {
return schedule(command, delay, executor(executor));
}

View File

@ -416,7 +416,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
}
TimeValue connectTimeout = connectionProfile.getConnectTimeout();
threadPool.schedule(channelsConnectedListener::onTimeout, connectTimeout, ThreadPool.Names.GENERIC);
threadPool.schedule(channelsConnectedListener::onTimeout, connectTimeout, threadPool.generic());
}
@Override

View File

@ -159,7 +159,7 @@ final class TransportHandshaker {
threadPool.schedule(
() -> handler.handleLocalException(new ConnectTransportException(node, "handshake_timeout[" + timeout + "]")),
timeout,
ThreadPool.Names.GENERIC
threadPool.generic()
);
success = true;
} catch (Exception e) {

View File

@ -140,7 +140,7 @@ final class TransportKeepAlive implements Closeable {
void ensureStarted() {
if (isStarted.get() == false && isStarted.compareAndSet(false, true)) {
threadPool.schedule(this, pingInterval, ThreadPool.Names.GENERIC);
threadPool.schedule(this, pingInterval, threadPool.generic());
}
}

View File

@ -1419,7 +1419,7 @@ public class TransportService extends AbstractLifecycleComponent
}
private void scheduleTimeout(TimeValue timeout) {
this.cancellable = threadPool.schedule(this, timeout, ThreadPool.Names.GENERIC);
this.cancellable = threadPool.schedule(this, timeout, threadPool.generic());
}
}

View File

@ -14,6 +14,7 @@ import org.elasticsearch.threadpool.TestThreadPool;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.Executor;
class CapturingThreadPool extends TestThreadPool {
final Deque<Tuple<TimeValue, Runnable>> scheduledTasks = new ArrayDeque<>();
@ -23,7 +24,7 @@ class CapturingThreadPool extends TestThreadPool {
}
@Override
public ScheduledCancellable schedule(Runnable task, TimeValue delay, String executor) {
public ScheduledCancellable schedule(Runnable task, TimeValue delay, Executor executor) {
scheduledTasks.add(new Tuple<>(delay, task));
return null;
}

View File

@ -22,6 +22,7 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Delayed;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -134,7 +135,7 @@ public class WorkerBulkByScrollTaskStateTests extends ESTestCase {
int batchSizeForMaxDelay = (int) (maxDelay.seconds() * originalRequestsPerSecond);
ThreadPool threadPool = new TestThreadPool(getTestName()) {
@Override
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, Executor name) {
assertThat(delay.nanos(), both(greaterThanOrEqualTo(0L)).and(lessThanOrEqualTo(maxDelay.nanos())));
return super.schedule(command, delay, name);
}
@ -185,7 +186,7 @@ public class WorkerBulkByScrollTaskStateTests extends ESTestCase {
public void testDelayNeverNegative() throws IOException {
// Thread pool that returns a ScheduledFuture that claims to have a negative delay
ThreadPool threadPool = new TestThreadPool("test") {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, Executor name) {
return new ScheduledCancellable() {
@Override
public long getDelay(TimeUnit unit) {

View File

@ -11,17 +11,18 @@ package org.elasticsearch.threadpool;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.Scheduler.Cancellable;
import org.elasticsearch.threadpool.Scheduler.ReschedulingRunnable;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.junit.After;
import org.junit.Before;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -32,7 +33,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@ -71,20 +72,14 @@ public class ScheduleWithFixedDelayTests extends ESTestCase {
Thread.currentThread().interrupt();
}
};
ReschedulingRunnable reschedulingRunnable = new ReschedulingRunnable(
runnable,
delay,
Names.GENERIC,
threadPool,
(e) -> {},
(e) -> {}
);
final Executor executor = mock(Executor.class);
ReschedulingRunnable reschedulingRunnable = new ReschedulingRunnable(runnable, delay, executor, threadPool, e -> {}, e -> {});
// not scheduled yet
verify(threadPool, never()).schedule(any(), any(), anyString());
verify(threadPool, never()).schedule(any(), any(), any(Executor.class));
reschedulingRunnable.start();
// this call was made by start
verify(threadPool, times(1)).schedule(reschedulingRunnable, delay, Names.GENERIC);
verify(threadPool, times(1)).schedule(same(reschedulingRunnable), same(delay), same(executor));
// create a thread and start the runnable
Thread runThread = new Thread() {
@ -104,7 +99,7 @@ public class ScheduleWithFixedDelayTests extends ESTestCase {
runThread.join();
// validate schedule was called again
verify(threadPool, times(2)).schedule(reschedulingRunnable, delay, Names.GENERIC);
verify(threadPool, times(2)).schedule(same(reschedulingRunnable), same(delay), same(executor));
}
public void testThatRunnableIsRescheduled() throws Exception {
@ -121,7 +116,7 @@ public class ScheduleWithFixedDelayTests extends ESTestCase {
}
};
Cancellable cancellable = threadPool.scheduleWithFixedDelay(countingRunnable, TimeValue.timeValueMillis(10L), Names.GENERIC);
Cancellable cancellable = threadPool.scheduleWithFixedDelay(countingRunnable, TimeValue.timeValueMillis(10L), threadPool.generic());
assertNotNull(cancellable);
// wait for the number of successful count down operations
@ -169,7 +164,7 @@ public class ScheduleWithFixedDelayTests extends ESTestCase {
throw new RuntimeException("throw at end");
}
};
Cancellable cancellable = threadPool.scheduleWithFixedDelay(countingRunnable, TimeValue.timeValueMillis(10L), Names.GENERIC);
Cancellable cancellable = threadPool.scheduleWithFixedDelay(countingRunnable, TimeValue.timeValueMillis(10L), threadPool.generic());
cancellableRef.set(cancellable);
// wait for the runnable to finish
doneLatch.await();
@ -203,7 +198,11 @@ public class ScheduleWithFixedDelayTests extends ESTestCase {
}
};
Cancellable cancellable = threadPool.scheduleWithFixedDelay(runnable, TimeValue.timeValueMillis(10L), Names.SAME);
Cancellable cancellable = threadPool.scheduleWithFixedDelay(
runnable,
TimeValue.timeValueMillis(10L),
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
Object resultingObject = resultsFuture.get();
assertNotNull(resultingObject);
assertThat(resultingObject, instanceOf(Throwable.class));
@ -236,7 +235,7 @@ public class ScheduleWithFixedDelayTests extends ESTestCase {
}
};
final Cancellable cancellable = threadPool.scheduleWithFixedDelay(runnable, TimeValue.timeValueMillis(10L), Names.GENERIC);
final Cancellable cancellable = threadPool.scheduleWithFixedDelay(runnable, TimeValue.timeValueMillis(10L), threadPool.generic());
assertFalse(resultsFuture.isDone());
final Object o = new Object();
@ -252,7 +251,7 @@ public class ScheduleWithFixedDelayTests extends ESTestCase {
terminate(threadPool);
threadPool = new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "fixed delay tests").build()) {
@Override
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, Executor executor) {
if (command instanceof ReschedulingRunnable) {
((ReschedulingRunnable) command).onRejection(new EsRejectedExecutionException());
} else {
@ -265,7 +264,7 @@ public class ScheduleWithFixedDelayTests extends ESTestCase {
ReschedulingRunnable reschedulingRunnable = new ReschedulingRunnable(
runnable,
delay,
Names.GENERIC,
threadPool.generic(),
threadPool,
(e) -> {},
(e) -> {}
@ -284,7 +283,7 @@ public class ScheduleWithFixedDelayTests extends ESTestCase {
};
final TimeValue interval = TimeValue.timeValueMillis(50L);
final Cancellable cancellable = threadPool.scheduleWithFixedDelay(countingRunnable, interval, Names.GENERIC);
final Cancellable cancellable = threadPool.scheduleWithFixedDelay(countingRunnable, interval, threadPool.generic());
doneLatch.await();
cancellable.cancel();

View File

@ -40,7 +40,11 @@ public class SchedulerTests extends ESTestCase {
}
private void scheduleAndCancel(ThreadPool threadPool, AtomicLong executed, String type) {
Scheduler.ScheduledCancellable scheduled = threadPool.schedule(executed::incrementAndGet, TimeValue.timeValueSeconds(20), type);
Scheduler.ScheduledCancellable scheduled = threadPool.schedule(
executed::incrementAndGet,
TimeValue.timeValueSeconds(20),
threadPool.executor(type)
);
assertEquals(1, schedulerQueueSize(threadPool));
assertFalse(scheduled.isCancelled());
assertTrue(scheduled.cancel());
@ -77,7 +81,7 @@ public class SchedulerTests extends ESTestCase {
ThreadPool threadPool = new TestThreadPool("test");
try {
List<Scheduler.ScheduledCancellable> jobs = LongStream.range(20, 30)
.mapToObj(delay -> threadPool.schedule(() -> {}, TimeValue.timeValueSeconds(delay), ThreadPool.Names.SAME))
.mapToObj(delay -> threadPool.schedule(() -> {}, TimeValue.timeValueSeconds(delay), EsExecutors.DIRECT_EXECUTOR_SERVICE))
.collect(Collectors.toCollection(ArrayList::new));
Collections.reverse(jobs);
@ -118,7 +122,13 @@ public class SchedulerTests extends ESTestCase {
CountDownLatch missingExecutions = new CountDownLatch(ThreadPool.THREAD_POOL_TYPES.keySet().size());
try {
ThreadPool.THREAD_POOL_TYPES.keySet()
.forEach(type -> threadPool.schedule(missingExecutions::countDown, TimeValue.timeValueMillis(randomInt(5)), type));
.forEach(
type -> threadPool.schedule(
missingExecutions::countDown,
TimeValue.timeValueMillis(randomInt(5)),
threadPool.executor(type)
)
);
assertTrue(missingExecutions.await(30, TimeUnit.SECONDS));
} finally {

View File

@ -310,7 +310,7 @@ public class ThreadPoolTests extends ESTestCase {
return "slow-test-task";
}
};
threadPool.schedule(runnable, TimeValue.timeValueMillis(randomLongBetween(0, 300)), ThreadPool.Names.SAME);
threadPool.schedule(runnable, TimeValue.timeValueMillis(randomLongBetween(0, 300)), EsExecutors.DIRECT_EXECUTOR_SERVICE);
assertBusy(appender::assertAllExpectationsMatched);
} finally {
Loggers.removeAppender(logger, appender);

View File

@ -195,11 +195,6 @@ public class TransportKeepAliveTests extends ESTestCase {
return doSchedule(task, delay);
}
@Override
public ScheduledCancellable schedule(Runnable task, TimeValue delay, String executor) {
return doSchedule(task, delay);
}
private ScheduledCancellable doSchedule(Runnable task, TimeValue delay) {
scheduledTasks.add(new Tuple<>(delay, task));
return null;

View File

@ -561,7 +561,7 @@ public class MockTransportService extends TransportService {
runnable.run();
} else {
requestsToSendWhenCleared.add(runnable);
threadPool.schedule(runnable, delay, ThreadPool.Names.GENERIC);
threadPool.schedule(runnable, delay, threadPool.generic());
}
}
}

View File

@ -19,7 +19,6 @@ import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.threadpool.ThreadPool.Names.GENERIC;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
@ -273,14 +272,14 @@ public class DeterministicTaskQueueTests extends ESTestCase {
final ThreadPool threadPool = taskQueue.getThreadPool();
final long delayMillis = randomLongBetween(1, 100);
threadPool.schedule(() -> strings.add("deferred"), TimeValue.timeValueMillis(delayMillis), GENERIC);
threadPool.schedule(() -> strings.add("deferred"), TimeValue.timeValueMillis(delayMillis), threadPool.generic());
assertFalse(taskQueue.hasRunnableTasks());
assertTrue(taskQueue.hasDeferredTasks());
threadPool.schedule(() -> strings.add("runnable"), TimeValue.ZERO, GENERIC);
threadPool.schedule(() -> strings.add("runnable"), TimeValue.ZERO, threadPool.generic());
assertTrue(taskQueue.hasRunnableTasks());
threadPool.schedule(() -> strings.add("also runnable"), TimeValue.MINUS_ONE, GENERIC);
threadPool.schedule(() -> strings.add("also runnable"), TimeValue.MINUS_ONE, threadPool.generic());
taskQueue.runAllTasks();
@ -290,8 +289,8 @@ public class DeterministicTaskQueueTests extends ESTestCase {
final long delayMillis1 = randomLongBetween(2, 100);
final long delayMillis2 = randomLongBetween(1, delayMillis1 - 1);
threadPool.schedule(() -> strings.add("further deferred"), TimeValue.timeValueMillis(delayMillis1), GENERIC);
threadPool.schedule(() -> strings.add("not quite so deferred"), TimeValue.timeValueMillis(delayMillis2), GENERIC);
threadPool.schedule(() -> strings.add("further deferred"), TimeValue.timeValueMillis(delayMillis1), threadPool.generic());
threadPool.schedule(() -> strings.add("not quite so deferred"), TimeValue.timeValueMillis(delayMillis2), threadPool.generic());
assertFalse(taskQueue.hasRunnableTasks());
assertTrue(taskQueue.hasDeferredTasks());
@ -303,7 +302,7 @@ public class DeterministicTaskQueueTests extends ESTestCase {
final Scheduler.Cancellable cancelledBeforeExecution = threadPool.schedule(
() -> strings.add("cancelled before execution"),
cancelledDelay,
""
threadPool.generic()
);
cancelledBeforeExecution.cancel();
@ -372,7 +371,7 @@ public class DeterministicTaskQueueTests extends ESTestCase {
Scheduler.Cancellable cancellable = threadPool.scheduleWithFixedDelay(
() -> strings.add("periodic-" + counter.getAndIncrement()),
TimeValue.timeValueMillis(intervalMillis),
GENERIC
threadPool.generic()
);
assertFalse(taskQueue.hasRunnableTasks());
assertTrue(taskQueue.hasDeferredTasks());

View File

@ -237,7 +237,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
removeCompletionListener(id);
listener.onResponse(getResponseWithHeaders());
}
}, waitForCompletion, "generic");
}, waitForCompletion, threadPool.generic());
} catch (Exception exc) {
listener.onFailure(exc);
return;

View File

@ -42,6 +42,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -56,7 +57,7 @@ public class AsyncSearchTaskTests extends ESTestCase {
public void beforeTest() {
threadPool = new TestThreadPool(getTestName()) {
@Override
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, Executor executor) {
if (throwOnSchedule) {
throw new RuntimeException();
}

View File

@ -76,6 +76,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
@ -93,6 +94,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
private final Client client;
private final ThreadPool threadPool;
private final Executor ccrExecutor;
private final ClusterService clusterService;
private final IndexScopedSettings indexScopedSettings;
private final TimeValue retentionLeaseRenewInterval;
@ -102,6 +104,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
super(ShardFollowTask.NAME, Ccr.CCR_THREAD_POOL_NAME);
this.client = client;
this.threadPool = threadPool;
this.ccrExecutor = threadPool.executor(getExecutor());
this.clusterService = clusterService;
this.indexScopedSettings = settingsModule.getIndexScopedSettings();
this.retentionLeaseRenewInterval = CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(settingsModule.getSettings());
@ -150,11 +153,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
) {
ShardFollowTask params = taskInProgress.getParams();
Client followerClient = wrapClient(client, params.getHeaders(), clusterService.state());
BiConsumer<TimeValue, Runnable> scheduler = (delay, command) -> threadPool.scheduleUnlessShuttingDown(
delay,
Ccr.CCR_THREAD_POOL_NAME,
command
);
BiConsumer<TimeValue, Runnable> scheduler = (delay, command) -> threadPool.scheduleUnlessShuttingDown(delay, ccrExecutor, command);
final String recordedLeaderShardHistoryUUID = getLeaderShardHistoryUUID(params);
return new ShardFollowNodeTask(
@ -528,7 +527,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
remoteClient(params),
listener
);
}, retentionLeaseRenewInterval, Ccr.CCR_THREAD_POOL_NAME);
}, retentionLeaseRenewInterval, ccrExecutor);
}
private void logRetentionLeaseFailure(final String retentionLeaseId, final Throwable cause) {
@ -594,7 +593,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
e
);
try {
threadPool.schedule(() -> nodeOperation(task, params, state), params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME);
threadPool.schedule(() -> nodeOperation(task, params, state), params.getMaxRetryDelay(), ccrExecutor);
} catch (EsRejectedExecutionException rex) {
rex.addSuppressed(e);
shardFollowNodeTask.onFatalFailure(rex);

View File

@ -115,7 +115,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> {
assert delay.millis() < 100 : "The delay should be kept to a minimum, so that this test does not take to long to run";
if (stopped.get() == false) {
threadPool.schedule(task, delay, ThreadPool.Names.GENERIC);
threadPool.schedule(task, delay, threadPool.generic());
}
};
List<Translog.Operation> receivedOperations = Collections.synchronizedList(new ArrayList<>());

View File

@ -581,7 +581,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
);
final String recordedLeaderIndexHistoryUUID = leaderGroup.getPrimary().getHistoryUUID();
BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> threadPool.schedule(task, delay, ThreadPool.Names.GENERIC);
BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> threadPool.schedule(task, delay, threadPool.generic());
AtomicBoolean stopped = new AtomicBoolean(false);
Set<Long> fetchOperations = new HashSet<>();
return new ShardFollowNodeTask(

View File

@ -162,7 +162,7 @@ public class AsyncTaskMaintenanceService extends AbstractLifecycleComponent impl
synchronized void scheduleNextCleanup() {
if (isCleanupRunning) {
try {
cancellable = threadPool.schedule(this::executeNextCleanup, delay, ThreadPool.Names.GENERIC);
cancellable = threadPool.schedule(this::executeNextCleanup, delay, threadPool.generic());
} catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) {
logger.debug("failed to schedule next maintenance task; shutting down", e);

View File

@ -77,16 +77,16 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
// with wrapping the command in RunOnce we ensure the command isn't executed twice, e.g. if the
// future is already running and cancel returns true
this.command = new RunOnce(command);
this.scheduled = threadPool.schedule(command::run, delay, ThreadPool.Names.GENERIC);
this.scheduled = threadPool.schedule(command::run, delay, threadPool.generic());
}
public void reschedule(TimeValue delay) {
// note: cancel return true if the runnable is currently executing
if (scheduled.cancel()) {
if (delay.duration() > 0) {
scheduled = threadPool.schedule(command::run, delay, ThreadPool.Names.GENERIC);
scheduled = threadPool.schedule(command::run, delay, threadPool.generic());
} else {
threadPool.executor(ThreadPool.Names.GENERIC).execute(command::run);
threadPool.generic().execute(command::run);
}
}
}

View File

@ -33,6 +33,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@ -397,9 +398,8 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
}
@Override
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, Executor executor) {
delays.add(delay);
return super.schedule(command, TimeValue.ZERO, executor);
}

View File

@ -110,7 +110,7 @@ public class EnrichPolicyMaintenanceService implements LocalNodeMasterListener {
if (isMaster) {
try {
TimeValue waitTime = EnrichPlugin.ENRICH_CLEANUP_PERIOD.get(settings);
cancellable = threadPool.schedule(this::execute, waitTime, ThreadPool.Names.GENERIC);
cancellable = threadPool.schedule(this::execute, waitTime, threadPool.generic());
} catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) {
logger.debug("Failed to schedule next [enrich] maintenance task; Shutting down", e);

View File

@ -202,7 +202,7 @@ public class AsyncOperatorTests extends ESTestCase {
}
};
TimeValue delay = TimeValue.timeValueMillis(randomIntBetween(0, 50));
threadPool.schedule(command, delay, ESQL_TEST_EXECUTOR);
threadPool.schedule(command, delay, threadPool.executor(ESQL_TEST_EXECUTOR));
}
}
}

View File

@ -166,7 +166,7 @@ public class MlDailyMaintenanceService implements Releasable {
private synchronized void scheduleNext() {
try {
cancellable = threadPool.schedule(this::triggerTasks, schedulerProvider.get(), ThreadPool.Names.GENERIC);
cancellable = threadPool.schedule(this::triggerTasks, schedulerProvider.get(), threadPool.generic());
} catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) {
logger.debug("failed to schedule next maintenance task; shutting down", e);

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.persistent.PersistentTasksClusterService;
@ -337,7 +338,7 @@ public class TransportStopDatafeedAction extends TransportTasksAction<
threadPool.schedule(
() -> doExecute(task, request, listener, attempt + 1),
TimeValue.timeValueMillis(100L * attempt),
ThreadPool.Names.SAME
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
} else {
listener.onFailure(

View File

@ -341,7 +341,7 @@ public class DatafeedRunner {
doDatafeedRealtime(nextDelayInMsSinceEpoch, jobId, holder);
}
}
}, delay, MachineLearning.DATAFEED_THREAD_POOL_NAME);
}, delay, threadPool.executor(MachineLearning.DATAFEED_THREAD_POOL_NAME));
}
}

View File

@ -56,7 +56,11 @@ abstract class AbstractPyTorchAction<T> extends AbstractInitializableRunnable {
@Override
public final void init() {
if (this.timeoutHandler == null) {
this.timeoutHandler = threadPool.schedule(this::onTimeout, timeout, MachineLearning.UTILITY_THREAD_POOL_NAME);
this.timeoutHandler = threadPool.schedule(
this::onTimeout,
timeout,
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
);
}
}

View File

@ -15,19 +15,16 @@ import org.elasticsearch.client.internal.IndicesAdminClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Before;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -37,29 +34,18 @@ public class MlInitializationServiceTests extends ESTestCase {
private static final ClusterName CLUSTER_NAME = new ClusterName("my_cluster");
private ThreadPool threadPool;
private ExecutorService executorService;
private ClusterService clusterService;
private Client client;
private MlAssignmentNotifier mlAssignmentNotifier;
@Before
public void setUpMocks() {
threadPool = mock(ThreadPool.class);
executorService = mock(ExecutorService.class);
final var deterministicTaskQueue = new DeterministicTaskQueue();
threadPool = deterministicTaskQueue.getThreadPool();
clusterService = mock(ClusterService.class);
client = mock(Client.class);
mlAssignmentNotifier = mock(MlAssignmentNotifier.class);
doAnswer(invocation -> {
((Runnable) invocation.getArguments()[0]).run();
return null;
}).when(executorService).execute(any(Runnable.class));
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
when(threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)).thenReturn(executorService);
Scheduler.ScheduledCancellable scheduledCancellable = mock(Scheduler.ScheduledCancellable.class);
when(threadPool.schedule(any(), any(), anyString())).thenReturn(scheduledCancellable);
when(clusterService.getClusterName()).thenReturn(CLUSTER_NAME);
@SuppressWarnings("unchecked")

View File

@ -46,6 +46,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@ -165,7 +166,7 @@ public abstract class MlSingleNodeTestCase extends ESSingleNodeTestCase {
doAnswer(invocationOnMock -> {
((Runnable) invocationOnMock.getArguments()[0]).run();
return null;
}).when(tp).schedule(any(Runnable.class), any(TimeValue.class), any(String.class));
}).when(tp).schedule(any(Runnable.class), any(TimeValue.class), any(Executor.class));
return tp;
}

View File

@ -47,6 +47,7 @@ import org.mockito.ArgumentCaptor;
import java.net.InetAddress;
import java.util.Collections;
import java.util.Date;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
@ -167,7 +168,7 @@ public class DatafeedRunnerTests extends ESTestCase {
datafeedRunner.run(task, handler);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), anyString());
verify(threadPool, never()).schedule(any(), any(), any(Executor.class));
verify(auditor).warning(JOB_ID, "Datafeed lookback retrieved no data");
}
@ -178,7 +179,7 @@ public class DatafeedRunnerTests extends ESTestCase {
datafeedRunner.run(task, handler);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), anyString());
verify(threadPool, never()).schedule(any(), any(), any(Executor.class));
}
public void testStart_extractionProblem() throws Exception {
@ -188,7 +189,7 @@ public class DatafeedRunnerTests extends ESTestCase {
datafeedRunner.run(task, handler);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), anyString());
verify(threadPool, never()).schedule(any(), any(), any(Executor.class));
verify(auditor, times(1)).error(eq(JOB_ID), anyString());
}
@ -202,7 +203,7 @@ public class DatafeedRunnerTests extends ESTestCase {
r.run();
}
return mock(Scheduler.ScheduledCancellable.class);
}).when(threadPool).schedule(any(), any(), anyString());
}).when(threadPool).schedule(any(), any(), any(Executor.class));
when(datafeedJob.runLookBack(anyLong(), anyLong())).thenThrow(new DatafeedJob.EmptyDataCountException(0L, false));
when(datafeedJob.runRealtime()).thenThrow(new DatafeedJob.EmptyDataCountException(0L, false));
@ -211,7 +212,7 @@ public class DatafeedRunnerTests extends ESTestCase {
DatafeedTask task = createDatafeedTask(DATAFEED_ID, 0L, null);
datafeedRunner.run(task, handler);
verify(threadPool, times(11)).schedule(any(), any(), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME));
verify(threadPool, times(11)).schedule(any(), any(), any(Executor.class));
verify(auditor, times(1)).warning(eq(JOB_ID), anyString());
}
@ -261,13 +262,13 @@ public class DatafeedRunnerTests extends ESTestCase {
task = spyDatafeedTask(task);
datafeedRunner.run(task, handler);
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
verify(threadPool, times(2)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
if (cancelled) {
task.stop("test", StopDatafeedAction.DEFAULT_TIMEOUT);
verify(handler).accept(null);
assertThat(datafeedRunner.isRunning(task), is(false));
} else {
verify(threadPool, times(1)).schedule(any(), eq(new TimeValue(1)), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME));
verify(threadPool, times(1)).schedule(any(), eq(new TimeValue(1)), any(Executor.class));
assertThat(datafeedRunner.isRunning(task), is(true));
}
}
@ -307,7 +308,7 @@ public class DatafeedRunnerTests extends ESTestCase {
capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", jobOpenedCs.build(), anotherJobCs));
// Now it should run as the job state changed to OPENED
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
verify(threadPool, times(2)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
}
public void testDatafeedTaskWaitsUntilAutodetectCommunicatorIsOpen() {
@ -345,7 +346,7 @@ public class DatafeedRunnerTests extends ESTestCase {
capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", cs, anotherJobCs));
// Now it should run as the autodetect communicator is open
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
verify(threadPool, times(2)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
}
public void testDatafeedTaskWaitsUntilJobIsNotStale() {
@ -383,7 +384,7 @@ public class DatafeedRunnerTests extends ESTestCase {
capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", jobOpenedCs.build(), anotherJobCs));
// Now it should run as the job state chanded to OPENED
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
verify(threadPool, times(2)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
}
public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() {

View File

@ -10,6 +10,7 @@ package org.elasticsearch.xpack.ml.inference.deployment;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.Scheduler;
@ -20,6 +21,7 @@ import org.elasticsearch.xpack.ml.inference.pytorch.results.ThreadSettings;
import org.mockito.ArgumentCaptor;
import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import static org.mockito.ArgumentMatchers.any;
@ -58,9 +60,8 @@ public class ThreadSettingsControlMessagePytorchActionTests extends ESTestCase {
AtomicInteger timeoutCount = new AtomicInteger();
when(processContext.getTimeoutCount()).thenReturn(timeoutCount);
Scheduler.ScheduledCancellable cancellable = mock(Scheduler.ScheduledCancellable.class);
ThreadPool tp = mock(ThreadPool.class);
when(tp.schedule(any(), any(), anyString())).thenReturn(cancellable);
final var deterministicTaskQueue = new DeterministicTaskQueue();
ThreadPool tp = deterministicTaskQueue.getThreadPool();
{
ActionListener<ThreadSettings> listener = mock(ActionListener.class);
@ -116,7 +117,7 @@ public class ThreadSettingsControlMessagePytorchActionTests extends ESTestCase {
Scheduler.ScheduledCancellable cancellable = mock(Scheduler.ScheduledCancellable.class);
ThreadPool tp = mock(ThreadPool.class);
when(tp.schedule(any(), any(), anyString())).thenReturn(cancellable);
when(tp.schedule(any(), any(), any(Executor.class))).thenReturn(cancellable);
ActionListener<ThreadSettings> listener = mock(ActionListener.class);
ArgumentCaptor<BytesReference> messageCapture = ArgumentCaptor.forClass(BytesReference.class);

View File

@ -57,6 +57,7 @@ import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import static org.hamcrest.Matchers.equalTo;
@ -435,7 +436,7 @@ public class JobResultsPersisterTests extends ESTestCase {
doAnswer(invocationOnMock -> {
((Runnable) invocationOnMock.getArguments()[0]).run();
return null;
}).when(tp).schedule(any(Runnable.class), any(TimeValue.class), any(String.class));
}).when(tp).schedule(any(Runnable.class), any(TimeValue.class), any(Executor.class));
return new ResultsPersisterService(tp, client, clusterService, Settings.EMPTY);
}

View File

@ -94,6 +94,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -292,7 +293,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
doAnswer(invocationOnMock -> {
((Runnable) invocationOnMock.getArguments()[0]).run();
return null;
}).when(tp).schedule(any(Runnable.class), any(TimeValue.class), any(String.class));
}).when(tp).schedule(any(Runnable.class), any(TimeValue.class), any(Executor.class));
return tp;
}

View File

@ -49,6 +49,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
@ -414,7 +415,7 @@ public class ResultsPersisterServiceTests extends ESTestCase {
doAnswer(invocationOnMock -> {
((Runnable) invocationOnMock.getArguments()[0]).run();
return null;
}).when(tp).schedule(any(Runnable.class), any(TimeValue.class), any(String.class));
}).when(tp).schedule(any(Runnable.class), any(TimeValue.class), any(Executor.class));
return new ResultsPersisterService(tp, client, clusterService, Settings.EMPTY);
}
}

View File

@ -214,7 +214,7 @@ public class AsyncTaskManagementService<
if (acquiredListener != null) {
acquiredListener.onResponse(operation.initialResponse(searchTask));
}
}, waitForCompletionTimeout, ThreadPool.Names.SEARCH);
}, waitForCompletionTimeout, threadPool.executor(ThreadPool.Names.SEARCH));
// This will be performed at the end of normal execution
return ActionListener.wrap(response -> {
ActionListener<Response> acquiredListener = exclusiveListener.getAndSet(null);

View File

@ -219,7 +219,7 @@ public class BlobStoreCacheMaintenanceService implements ClusterStateListener {
final TimeValue delay = periodicTaskInterval;
if (delay.getMillis() > 0L) {
final PeriodicMaintenanceTask task = new PeriodicMaintenanceTask(periodicTaskKeepAlive, periodicTaskBatchSize);
periodicTask = threadPool.schedule(task, delay, ThreadPool.Names.GENERIC);
periodicTask = threadPool.schedule(task, delay, threadPool.generic());
} else {
periodicTask = null;
}

View File

@ -187,7 +187,7 @@ public class InitialNodeSecurityAutoConfiguration {
}
}, backoff);
}
}, TimeValue.timeValueSeconds(9), ThreadPool.Names.GENERIC));
}, TimeValue.timeValueSeconds(9), threadPool.generic()));
}
});
}

View File

@ -144,7 +144,6 @@ import static org.elasticsearch.common.hash.MessageDigests.sha256;
import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.elasticsearch.search.SearchService.DEFAULT_KEEPALIVE_SETTING;
import static org.elasticsearch.threadpool.ThreadPool.Names.GENERIC;
import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
@ -1004,7 +1003,7 @@ public final class TokenService {
listener
),
backoff.next(),
GENERIC
client.threadPool().generic()
);
} else {
if (retryTokenDocIds.isEmpty() == false) {
@ -1048,7 +1047,7 @@ public final class TokenService {
listener
),
backoff.next(),
GENERIC
client.threadPool().generic()
);
} else {
listener.onFailure(e);
@ -1161,7 +1160,7 @@ public final class TokenService {
.schedule(
() -> findTokenFromRefreshToken(refreshToken, tokensIndexManager, backoff, listener),
backofTimeValue,
GENERIC
client.threadPool().generic()
);
} else {
logger.warn("failed to find token from refresh token after all retries");
@ -1331,7 +1330,7 @@ public final class TokenService {
.schedule(
() -> innerRefresh(refreshToken, tokenDoc, clientAuth, backoff, refreshRequested, listener),
backoff.next(),
GENERIC
client.threadPool().generic()
);
} else {
logger.info(
@ -1367,7 +1366,7 @@ public final class TokenService {
.schedule(
() -> getTokenDocAsync(tokenDoc.id(), refreshedTokenIndex, true, this),
backoff.next(),
GENERIC
client.threadPool().generic()
);
} else {
logger.warn("could not get token document [{}] for refresh after all retries", tokenDoc.id());
@ -1385,7 +1384,7 @@ public final class TokenService {
.schedule(
() -> innerRefresh(refreshToken, tokenDoc, clientAuth, backoff, refreshRequested, listener),
backoff.next(),
GENERIC
client.threadPool().generic()
);
} else {
logger.warn("failed to update the original token document [{}], after all retries", tokenDoc.id());
@ -1455,7 +1454,11 @@ public final class TokenService {
if (backoff.hasNext()) {
logger.info("could not get token document [{}] that should have been created, retrying", tokenDocId);
client.threadPool()
.schedule(() -> getTokenDocAsync(tokenDocId, tokensIndex, false, actionListener), backoff.next(), GENERIC);
.schedule(
() -> getTokenDocAsync(tokenDocId, tokensIndex, false, actionListener),
backoff.next(),
client.threadPool().generic()
);
} else {
logger.warn("could not get token document [{}] that should have been created after all retries", tokenDocId);
onFailure.accept(invalidGrantException("could not refresh the requested token"));

View File

@ -14,12 +14,12 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.security.authc.AuthenticationResult;
import org.elasticsearch.xpack.core.security.authc.Realm;
@ -159,7 +159,7 @@ public final class LdapRealm extends CachingUsernamePasswordRealm {
logger
);
threadPool.generic().execute(cancellableLdapRunnable);
threadPool.schedule(cancellableLdapRunnable::maybeTimeout, executionTimeout, Names.SAME);
threadPool.schedule(cancellableLdapRunnable::maybeTimeout, executionTimeout, EsExecutors.DIRECT_EXECUTOR_SERVICE);
}
@Override
@ -181,7 +181,7 @@ public final class LdapRealm extends CachingUsernamePasswordRealm {
logger
);
threadPool.generic().execute(cancellableLdapRunnable);
threadPool.schedule(cancellableLdapRunnable::maybeTimeout, executionTimeout, Names.SAME);
threadPool.schedule(cancellableLdapRunnable::maybeTimeout, executionTimeout, EsExecutors.DIRECT_EXECUTOR_SERVICE);
} else {
userActionListener.onResponse(null);
}

View File

@ -38,7 +38,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import static org.elasticsearch.threadpool.ThreadPool.Names.GENERIC;
import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
import static org.elasticsearch.xpack.core.XPackSettings.ENROLLMENT_ENABLED;
@ -76,7 +75,8 @@ public class InternalEnrollmentTokenGenerator extends BaseEnrollmentTokenGenerat
if (null == transportInfo || null == httpInfo) {
if (backoff.hasNext()) {
LOGGER.debug("Local node's HTTP/transport info is not yet available, will retry...");
client.threadPool().schedule(() -> maybeCreateNodeEnrollmentToken(consumer, backoff), backoff.next(), GENERIC);
client.threadPool()
.schedule(() -> maybeCreateNodeEnrollmentToken(consumer, backoff), backoff.next(), client.threadPool().generic());
} else {
LOGGER.warn("Unable to get local node's HTTP/transport info after all retries.");
consumer.accept(null);
@ -141,7 +141,8 @@ public class InternalEnrollmentTokenGenerator extends BaseEnrollmentTokenGenerat
if (null == httpInfo) {
if (backoff.hasNext()) {
LOGGER.info("Local node's HTTP info is not yet available, will retry...");
client.threadPool().schedule(() -> createKibanaEnrollmentToken(consumer, backoff), backoff.next(), GENERIC);
client.threadPool()
.schedule(() -> createKibanaEnrollmentToken(consumer, backoff), backoff.next(), client.threadPool().generic());
} else {
LOGGER.warn("Unable to get local node's HTTP info after all retries.");
consumer.accept(null);

View File

@ -744,7 +744,7 @@ public class ProfileService {
.schedule(
() -> getOrCreateProfileWithBackoff(subject, profileDocument, backoff, listener),
backoffTimeValue,
ThreadPool.Names.GENERIC
client.threadPool().generic()
);
} else {
// Retry has depleted. This can only happen when the document or the profile index itself gets deleted