mirror of https://mirror.osredm.com/root/redis.git
Gradually reduce defrag CPU usage when defragmentation is ineffective (#13752)
This PR addresses an issue where if a module does not provide a defragmentation callback, we cannot defragment the fragmentation it generates. However, the defragmentation process still considers a large amount of fragmentation to be present, leading to more aggressive defragmentation efforts that ultimately have no effect. To mitigate this, the PR introduces a mechanism to gradually reduce the CPU consumption for defragmentation when the defragmentation effectiveness is poor. This occurs when the fragmentation rate drops below 2% and the hit ratio is less than 1%, or when the fragmentation rate increases by no more than 2%. The CPU consumption will be gradually decreased until it reaches the minimum threshold defined by `active-defrag-cycle-min`. --------- Co-authored-by: oranagra <oran@redislabs.com>
This commit is contained in:
parent
dcd0b3d020
commit
f86575f210
37
src/defrag.c
37
src/defrag.c
|
@ -14,6 +14,7 @@
|
|||
|
||||
#include "server.h"
|
||||
#include <stddef.h>
|
||||
#include <math.h>
|
||||
|
||||
#ifdef HAVE_DEFRAG
|
||||
|
||||
|
@ -1024,7 +1025,7 @@ int defragLaterStep(redisDb *db, int slot, long long endtime) {
|
|||
#define LIMIT(y, min, max) ((y)<(min)? min: ((y)>(max)? max: (y)))
|
||||
|
||||
/* decide if defrag is needed, and at what CPU effort to invest in it */
|
||||
void computeDefragCycles(void) {
|
||||
void computeDefragCycles(float decay_rate) {
|
||||
size_t frag_bytes;
|
||||
float frag_pct = getAllocatorFragmentation(&frag_bytes);
|
||||
/* If we're not already running, and below the threshold, exit. */
|
||||
|
@ -1040,6 +1041,7 @@ void computeDefragCycles(void) {
|
|||
server.active_defrag_threshold_upper,
|
||||
server.active_defrag_cycle_min,
|
||||
server.active_defrag_cycle_max);
|
||||
cpu_pct *= decay_rate;
|
||||
cpu_pct = LIMIT(cpu_pct,
|
||||
server.active_defrag_cycle_min,
|
||||
server.active_defrag_cycle_max);
|
||||
|
@ -1068,7 +1070,9 @@ void activeDefragCycle(void) {
|
|||
static int defrag_stage = 0;
|
||||
static unsigned long defrag_cursor = 0;
|
||||
static redisDb *db = NULL;
|
||||
static long long start_scan, start_stat;
|
||||
static long long start_scan, start_hits, start_misses;
|
||||
static float start_frag_pct;
|
||||
static float decay_rate = 1.0f;
|
||||
unsigned int iterations = 0;
|
||||
unsigned long long prev_defragged = server.stat_active_defrag_hits;
|
||||
unsigned long long prev_scanned = server.stat_active_defrag_scanned;
|
||||
|
@ -1104,13 +1108,13 @@ void activeDefragCycle(void) {
|
|||
/* Once a second, check if the fragmentation justfies starting a scan
|
||||
* or making it more aggressive. */
|
||||
run_with_period(1000) {
|
||||
computeDefragCycles();
|
||||
computeDefragCycles(decay_rate);
|
||||
}
|
||||
|
||||
/* Normally it is checked once a second, but when there is a configuration
|
||||
* change, we want to check it as soon as possible. */
|
||||
if (server.active_defrag_configuration_changed) {
|
||||
computeDefragCycles();
|
||||
computeDefragCycles(decay_rate);
|
||||
server.active_defrag_configuration_changed = 0;
|
||||
}
|
||||
|
||||
|
@ -1148,7 +1152,7 @@ void activeDefragCycle(void) {
|
|||
float frag_pct = getAllocatorFragmentation(&frag_bytes);
|
||||
serverLog(LL_VERBOSE,
|
||||
"Active defrag done in %dms, reallocated=%d, frag=%.0f%%, frag_bytes=%zu",
|
||||
(int)((now - start_scan)/1000), (int)(server.stat_active_defrag_hits - start_stat), frag_pct, frag_bytes);
|
||||
(int)((now - start_scan)/1000), (int)(server.stat_active_defrag_hits - start_hits), frag_pct, frag_bytes);
|
||||
|
||||
start_scan = now;
|
||||
current_db = -1;
|
||||
|
@ -1159,9 +1163,26 @@ void activeDefragCycle(void) {
|
|||
db = NULL;
|
||||
server.active_defrag_running = 0;
|
||||
|
||||
long long last_hits = server.stat_active_defrag_hits - start_hits;
|
||||
long long last_misses = server.stat_active_defrag_misses - start_misses;
|
||||
float last_frag_pct_change = start_frag_pct - frag_pct;
|
||||
/* When defragmentation efficiency is low, we gradually reduce the
|
||||
* speed for the next cycle to avoid CPU waste. However, in the
|
||||
* following two cases, we keep the normal speed:
|
||||
* 1) If the fragmentation percentage has increased or decreased by more than 2%.
|
||||
* 2) If the fragmentation percentage decrease is small, but hits are above 1%,
|
||||
* we still keep the normal speed. */
|
||||
if (fabs(last_frag_pct_change) > 2 ||
|
||||
(last_frag_pct_change < 0 && last_hits >= (last_hits + last_misses) * 0.01))
|
||||
{
|
||||
decay_rate = 1.0f;
|
||||
} else {
|
||||
decay_rate *= 0.9;
|
||||
}
|
||||
|
||||
moduleDefragEnd();
|
||||
|
||||
computeDefragCycles(); /* if another scan is needed, start it right away */
|
||||
computeDefragCycles(decay_rate); /* if another scan is needed, start it right away */
|
||||
if (server.active_defrag_running != 0 && ustime() < endtime)
|
||||
continue;
|
||||
break;
|
||||
|
@ -1169,7 +1190,9 @@ void activeDefragCycle(void) {
|
|||
else if (current_db==0) {
|
||||
/* Start a scan from the first database. */
|
||||
start_scan = ustime();
|
||||
start_stat = server.stat_active_defrag_hits;
|
||||
start_hits = server.stat_active_defrag_hits;
|
||||
start_misses = server.stat_active_defrag_misses;
|
||||
start_frag_pct = getAllocatorFragmentation(NULL);
|
||||
}
|
||||
|
||||
db = &server.db[current_db];
|
||||
|
|
|
@ -136,4 +136,100 @@ start_server {tags {"modules"}} {
|
|||
|
||||
assert_equal 1 [llength $keys]
|
||||
}
|
||||
|
||||
if {[string match {*jemalloc*} [s mem_allocator]] && [r debug mallctl arenas.page] <= 8192} {
|
||||
test {Reduce defrag CPU usage when module data can't be defragged} {
|
||||
r flushdb
|
||||
r config set hz 100
|
||||
r config set activedefrag no
|
||||
r config set active-defrag-threshold-lower 5
|
||||
r config set active-defrag-cycle-min 25
|
||||
r config set active-defrag-cycle-max 75
|
||||
r config set active-defrag-ignore-bytes 100kb
|
||||
|
||||
# Populate memory with interleaving field of same size.
|
||||
set n 20000
|
||||
set dummy "[string repeat x 400]"
|
||||
set rd [redis_deferring_client]
|
||||
for {set i 0} {$i < $n} {incr i} { $rd datatype.set k$i 1 $dummy }
|
||||
for {set i 0} {$i < [expr $n]} {incr i} { $rd read } ;# Discard replies
|
||||
|
||||
after 120 ;# serverCron only updates the info once in 100ms
|
||||
if {$::verbose} {
|
||||
puts "used [s allocator_allocated]"
|
||||
puts "rss [s allocator_active]"
|
||||
puts "frag [s allocator_frag_ratio]"
|
||||
puts "frag_bytes [s allocator_frag_bytes]"
|
||||
}
|
||||
assert_lessthan [s allocator_frag_ratio] 1.05
|
||||
|
||||
for {set i 0} {$i < $n} {incr i 2} { $rd del k$i }
|
||||
for {set j 0} {$j < $n} {incr j 2} { $rd read } ; # Discard del replies
|
||||
after 120 ;# serverCron only updates the info once in 100ms
|
||||
assert_morethan [s allocator_frag_ratio] 1.4
|
||||
|
||||
catch {r config set activedefrag yes} e
|
||||
if {[r config get activedefrag] eq "activedefrag yes"} {
|
||||
# wait for the active defrag to start working (decision once a second)
|
||||
wait_for_condition 50 100 {
|
||||
[s total_active_defrag_time] ne 0
|
||||
} else {
|
||||
after 120 ;# serverCron only updates the info once in 100ms
|
||||
puts [r info memory]
|
||||
puts [r info stats]
|
||||
puts [r memory malloc-stats]
|
||||
fail "defrag not started."
|
||||
}
|
||||
assert_morethan [s allocator_frag_ratio] 1.4
|
||||
|
||||
# The cpu usage of defragment will drop to active-defrag-cycle-min
|
||||
wait_for_condition 1000 50 {
|
||||
[s active_defrag_running] == 25
|
||||
} else {
|
||||
fail "Unable to reduce the defragmentation speed."
|
||||
}
|
||||
|
||||
# Fuzzy test to restore defragmentation speed to normal
|
||||
set end_time [expr {[clock seconds] + 10}]
|
||||
set speed_restored 0
|
||||
while {[clock seconds] < $end_time} {
|
||||
switch [expr {int(rand() * 3)}] {
|
||||
0 {
|
||||
# Randomly delete a key
|
||||
set random_key [r RANDOMKEY]
|
||||
if {$random_key != ""} {
|
||||
r DEL $random_key
|
||||
}
|
||||
}
|
||||
1 {
|
||||
# Randomly overwrite a key
|
||||
set random_key [r RANDOMKEY]
|
||||
if {$random_key != ""} {
|
||||
r datatype.set $random_key 1 $dummy
|
||||
}
|
||||
}
|
||||
2 {
|
||||
# Randomly generate a new key
|
||||
set random_key "key_[expr {int(rand() * 10000)}]"
|
||||
r datatype.set $random_key 1 $dummy
|
||||
}
|
||||
}
|
||||
|
||||
# Wait for defragmentation speed to restore.
|
||||
if {[s active_defrag_running] > 25} {
|
||||
set speed_restored 1
|
||||
break;
|
||||
}
|
||||
}
|
||||
assert_equal $speed_restored 1
|
||||
|
||||
# After the traffic disappears, the defragmentation speed will decrease again.
|
||||
wait_for_condition 1000 50 {
|
||||
[s active_defrag_running] == 25
|
||||
} else {
|
||||
fail "Unable to reduce the defragmentation speed after traffic disappears."
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue