Add reads/writes metrics for IO threads (#13703)

The main job of the IO thread is read queries and write replies, so
reads/writes metrics can reflect the workload of IO threads, now we also
support this metrics `io_threaded_reads/writes_processed` in detail for
each IO thread.

Of course, to avoid break changes, `io_threaded_reads/writes_processed`
is still there. But before async io thread commit, we may sum the IO
done by the main thread if IO threads are active, but now we only sum
the IO done by IO threads.

Now threads section in `info` command output is as follows:
```
# Threads
io_thread_0:clients=0,reads=0,writes=0
io_thread_1:clients=54,reads=6546940,writes=6546919
io_thread_2:clients=54,reads=6513650,writes=6513625
io_thread_3:clients=54,reads=6396571,writes=6396525
io_thread_4:clients=53,reads=6511120,writes=6511097
io_thread_5:clients=53,reads=6539302,writes=6539280
io_thread_6:clients=53,reads=6502269,writes=6502248
```
This commit is contained in:
Yuan Wang 2025-01-06 15:59:02 +08:00 committed by GitHub
parent 04f63d4af7
commit 8e9f5146dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 45 additions and 33 deletions

View File

@ -2060,11 +2060,8 @@ int _writeToClient(client *c, ssize_t *nwritten) {
* thread safe. */
int writeToClient(client *c, int handler_installed) {
if (!(c->io_flags & CLIENT_IO_WRITE_ENABLED)) return C_OK;
/* Update total number of writes on server */
atomicIncr(server.stat_total_writes_processed, 1);
if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) {
atomicIncr(server.stat_io_writes_processed, 1);
}
/* Update the number of writes of io threads on server */
atomicIncr(server.stat_io_writes_processed[c->running_tid], 1);
ssize_t nwritten = 0, totwritten = 0;
@ -2833,11 +2830,8 @@ void readQueryFromClient(connection *conn) {
if (!(c->io_flags & CLIENT_IO_READ_ENABLED)) return;
c->read_error = 0;
/* Update total number of reads on server */
atomicIncr(server.stat_total_reads_processed, 1);
if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) {
atomicIncr(server.stat_io_reads_processed, 1);
}
/* Update the number of reads of io threads on server */
atomicIncr(server.stat_io_reads_processed[c->running_tid], 1);
readlen = PROTO_IOBUF_LEN;
/* If this is a multi bulk request, and we are processing a bulk reply

View File

@ -2616,10 +2616,10 @@ void resetServerStats(void) {
server.stat_sync_full = 0;
server.stat_sync_partial_ok = 0;
server.stat_sync_partial_err = 0;
atomicSet(server.stat_io_reads_processed, 0);
atomicSet(server.stat_total_reads_processed, 0);
atomicSet(server.stat_io_writes_processed, 0);
atomicSet(server.stat_total_writes_processed, 0);
for (j = 0; j < IO_THREADS_MAX_NUM; j++) {
atomicSet(server.stat_io_reads_processed[j], 0);
atomicSet(server.stat_io_writes_processed[j], 0);
}
atomicSet(server.stat_client_qbuf_limit_disconnections, 0);
server.stat_client_outbuf_limit_disconnections = 0;
for (j = 0; j < STATS_METRIC_COUNT; j++) {
@ -5912,9 +5912,29 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
}
}
/* Threads */
int stat_io_ops_processed_calculated = 0;
long long stat_io_reads_processed = 0, stat_io_writes_processed = 0;
long long stat_total_reads_processed = 0, stat_total_writes_processed = 0;
if (all_sections || (dictFind(section_dict,"threads") != NULL)) {
if (sections++) info = sdscat(info,"\r\n");
info = sdscatprintf(info, "# Threads\r\n");
long long reads, writes;
for (j = 0; j < server.io_threads_num; j++) {
atomicGet(server.stat_io_reads_processed[j], reads);
atomicGet(server.stat_io_writes_processed[j], writes);
info = sdscatprintf(info, "io_thread_%d:clients=%d,reads=%lld,writes=%lld\r\n",
j, server.io_threads_clients_num[j], reads, writes);
stat_total_reads_processed += reads;
if (j != 0) stat_io_reads_processed += reads; /* Skip the main thread */
stat_total_writes_processed += writes;
if (j != 0) stat_io_writes_processed += writes; /* Skip the main thread */
}
stat_io_ops_processed_calculated = 1;
}
/* Stats */
if (all_sections || (dictFind(section_dict,"stats") != NULL)) {
long long stat_total_reads_processed, stat_total_writes_processed;
long long stat_net_input_bytes, stat_net_output_bytes;
long long stat_net_repl_input_bytes, stat_net_repl_output_bytes;
long long current_eviction_exceeded_time = server.stat_last_eviction_exceeded_time ?
@ -5922,16 +5942,25 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
long long current_active_defrag_time = server.stat_last_active_defrag_time ?
(long long) elapsedUs(server.stat_last_active_defrag_time): 0;
long long stat_client_qbuf_limit_disconnections;
long long stat_io_reads_processed, stat_io_writes_processed;
atomicGet(server.stat_total_reads_processed, stat_total_reads_processed);
atomicGet(server.stat_total_writes_processed, stat_total_writes_processed);
atomicGet(server.stat_net_input_bytes, stat_net_input_bytes);
atomicGet(server.stat_net_output_bytes, stat_net_output_bytes);
atomicGet(server.stat_net_repl_input_bytes, stat_net_repl_input_bytes);
atomicGet(server.stat_net_repl_output_bytes, stat_net_repl_output_bytes);
atomicGet(server.stat_client_qbuf_limit_disconnections, stat_client_qbuf_limit_disconnections);
atomicGet(server.stat_io_reads_processed, stat_io_reads_processed);
atomicGet(server.stat_io_writes_processed, stat_io_writes_processed);
/* If we calculated the total reads and writes in the threads section,
* we don't need to do it again, and also keep the values consistent. */
if (!stat_io_ops_processed_calculated) {
long long reads, writes;
for (j = 0; j < server.io_threads_num; j++) {
atomicGet(server.stat_io_reads_processed[j], reads);
stat_total_reads_processed += reads;
if (j != 0) stat_io_reads_processed += reads; /* Skip the main thread */
atomicGet(server.stat_io_writes_processed[j], writes);
stat_total_writes_processed += writes;
if (j != 0) stat_io_writes_processed += writes; /* Skip the main thread */
}
}
if (sections++) info = sdscat(info,"\r\n");
info = sdscatprintf(info, "# Stats\r\n" FMTARGS(
@ -6133,15 +6162,6 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
#endif /* RUSAGE_THREAD */
}
/* Threads */
if (all_sections || (dictFind(section_dict,"threads") != NULL)) {
if (sections++) info = sdscat(info,"\r\n");
info = sdscatprintf(info, "# Threads\r\n");
for (j = 0; j < server.io_threads_num; j++) {
info = sdscatprintf(info, "io_thread_%d:clients=%d\r\n", j, server.io_threads_clients_num[j]);
}
}
/* Modules */
if (all_sections || (dictFind(section_dict,"module_list") != NULL) || (dictFind(section_dict,"modules") != NULL)) {
if (sections++) info = sdscat(info,"\r\n");

View File

@ -1768,10 +1768,8 @@ struct redisServer {
long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */
long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */
long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */
redisAtomic long long stat_io_reads_processed; /* Number of read events processed by IO / Main threads */
redisAtomic long long stat_io_writes_processed; /* Number of write events processed by IO / Main threads */
redisAtomic long long stat_total_reads_processed; /* Total number of read events processed */
redisAtomic long long stat_total_writes_processed; /* Total number of write events processed */
redisAtomic long long stat_io_reads_processed[IO_THREADS_MAX_NUM]; /* Number of read events processed by IO / Main threads */
redisAtomic long long stat_io_writes_processed[IO_THREADS_MAX_NUM]; /* Number of write events processed by IO / Main threads */
redisAtomic long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */
long long stat_client_outbuf_limit_disconnections; /* Total number of clients reached output buf length limit */
/* The following two are used to track instantaneous metrics, like