diff --git a/src/networking.c b/src/networking.c index 589b3ed24..94303e1b9 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2060,11 +2060,8 @@ int _writeToClient(client *c, ssize_t *nwritten) { * thread safe. */ int writeToClient(client *c, int handler_installed) { if (!(c->io_flags & CLIENT_IO_WRITE_ENABLED)) return C_OK; - /* Update total number of writes on server */ - atomicIncr(server.stat_total_writes_processed, 1); - if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) { - atomicIncr(server.stat_io_writes_processed, 1); - } + /* Update the number of writes of io threads on server */ + atomicIncr(server.stat_io_writes_processed[c->running_tid], 1); ssize_t nwritten = 0, totwritten = 0; @@ -2833,11 +2830,8 @@ void readQueryFromClient(connection *conn) { if (!(c->io_flags & CLIENT_IO_READ_ENABLED)) return; c->read_error = 0; - /* Update total number of reads on server */ - atomicIncr(server.stat_total_reads_processed, 1); - if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) { - atomicIncr(server.stat_io_reads_processed, 1); - } + /* Update the number of reads of io threads on server */ + atomicIncr(server.stat_io_reads_processed[c->running_tid], 1); readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply diff --git a/src/server.c b/src/server.c index e9ab2a74b..7bc39c411 100644 --- a/src/server.c +++ b/src/server.c @@ -2616,10 +2616,10 @@ void resetServerStats(void) { server.stat_sync_full = 0; server.stat_sync_partial_ok = 0; server.stat_sync_partial_err = 0; - atomicSet(server.stat_io_reads_processed, 0); - atomicSet(server.stat_total_reads_processed, 0); - atomicSet(server.stat_io_writes_processed, 0); - atomicSet(server.stat_total_writes_processed, 0); + for (j = 0; j < IO_THREADS_MAX_NUM; j++) { + atomicSet(server.stat_io_reads_processed[j], 0); + atomicSet(server.stat_io_writes_processed[j], 0); + } atomicSet(server.stat_client_qbuf_limit_disconnections, 0); server.stat_client_outbuf_limit_disconnections = 0; for (j = 0; j < STATS_METRIC_COUNT; j++) { @@ -5912,9 +5912,29 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { } } + /* Threads */ + int stat_io_ops_processed_calculated = 0; + long long stat_io_reads_processed = 0, stat_io_writes_processed = 0; + long long stat_total_reads_processed = 0, stat_total_writes_processed = 0; + if (all_sections || (dictFind(section_dict,"threads") != NULL)) { + if (sections++) info = sdscat(info,"\r\n"); + info = sdscatprintf(info, "# Threads\r\n"); + long long reads, writes; + for (j = 0; j < server.io_threads_num; j++) { + atomicGet(server.stat_io_reads_processed[j], reads); + atomicGet(server.stat_io_writes_processed[j], writes); + info = sdscatprintf(info, "io_thread_%d:clients=%d,reads=%lld,writes=%lld\r\n", + j, server.io_threads_clients_num[j], reads, writes); + stat_total_reads_processed += reads; + if (j != 0) stat_io_reads_processed += reads; /* Skip the main thread */ + stat_total_writes_processed += writes; + if (j != 0) stat_io_writes_processed += writes; /* Skip the main thread */ + } + stat_io_ops_processed_calculated = 1; + } + /* Stats */ if (all_sections || (dictFind(section_dict,"stats") != NULL)) { - long long stat_total_reads_processed, stat_total_writes_processed; long long stat_net_input_bytes, stat_net_output_bytes; long long stat_net_repl_input_bytes, stat_net_repl_output_bytes; long long current_eviction_exceeded_time = server.stat_last_eviction_exceeded_time ? @@ -5922,16 +5942,25 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { long long current_active_defrag_time = server.stat_last_active_defrag_time ? (long long) elapsedUs(server.stat_last_active_defrag_time): 0; long long stat_client_qbuf_limit_disconnections; - long long stat_io_reads_processed, stat_io_writes_processed; - atomicGet(server.stat_total_reads_processed, stat_total_reads_processed); - atomicGet(server.stat_total_writes_processed, stat_total_writes_processed); atomicGet(server.stat_net_input_bytes, stat_net_input_bytes); atomicGet(server.stat_net_output_bytes, stat_net_output_bytes); atomicGet(server.stat_net_repl_input_bytes, stat_net_repl_input_bytes); atomicGet(server.stat_net_repl_output_bytes, stat_net_repl_output_bytes); atomicGet(server.stat_client_qbuf_limit_disconnections, stat_client_qbuf_limit_disconnections); - atomicGet(server.stat_io_reads_processed, stat_io_reads_processed); - atomicGet(server.stat_io_writes_processed, stat_io_writes_processed); + + /* If we calculated the total reads and writes in the threads section, + * we don't need to do it again, and also keep the values consistent. */ + if (!stat_io_ops_processed_calculated) { + long long reads, writes; + for (j = 0; j < server.io_threads_num; j++) { + atomicGet(server.stat_io_reads_processed[j], reads); + stat_total_reads_processed += reads; + if (j != 0) stat_io_reads_processed += reads; /* Skip the main thread */ + atomicGet(server.stat_io_writes_processed[j], writes); + stat_total_writes_processed += writes; + if (j != 0) stat_io_writes_processed += writes; /* Skip the main thread */ + } + } if (sections++) info = sdscat(info,"\r\n"); info = sdscatprintf(info, "# Stats\r\n" FMTARGS( @@ -6133,15 +6162,6 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { #endif /* RUSAGE_THREAD */ } - /* Threads */ - if (all_sections || (dictFind(section_dict,"threads") != NULL)) { - if (sections++) info = sdscat(info,"\r\n"); - info = sdscatprintf(info, "# Threads\r\n"); - for (j = 0; j < server.io_threads_num; j++) { - info = sdscatprintf(info, "io_thread_%d:clients=%d\r\n", j, server.io_threads_clients_num[j]); - } - } - /* Modules */ if (all_sections || (dictFind(section_dict,"module_list") != NULL) || (dictFind(section_dict,"modules") != NULL)) { if (sections++) info = sdscat(info,"\r\n"); diff --git a/src/server.h b/src/server.h index 3fe562184..49ab7b708 100644 --- a/src/server.h +++ b/src/server.h @@ -1768,10 +1768,8 @@ struct redisServer { long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */ long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */ long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */ - redisAtomic long long stat_io_reads_processed; /* Number of read events processed by IO / Main threads */ - redisAtomic long long stat_io_writes_processed; /* Number of write events processed by IO / Main threads */ - redisAtomic long long stat_total_reads_processed; /* Total number of read events processed */ - redisAtomic long long stat_total_writes_processed; /* Total number of write events processed */ + redisAtomic long long stat_io_reads_processed[IO_THREADS_MAX_NUM]; /* Number of read events processed by IO / Main threads */ + redisAtomic long long stat_io_writes_processed[IO_THREADS_MAX_NUM]; /* Number of write events processed by IO / Main threads */ redisAtomic long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */ long long stat_client_outbuf_limit_disconnections; /* Total number of clients reached output buf length limit */ /* The following two are used to track instantaneous metrics, like