AOF: remove memmove in aofChildWriteDiffData and record the latency (#5362)

Improve performance by avoiding inefficiencies in the parent process during AOFRW.
* AOF: record the latency of aofChildWriteDiffData
* AOF: avoid memmove in aofChildWriteDiffData
This commit is contained in:
zhaozhao.zz 2021-05-17 21:57:17 +08:00 committed by GitHub
parent fbc0e2b834
commit 0ecc814c81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 15 additions and 12 deletions

View File

@ -60,7 +60,7 @@ void aofClosePipes(void);
#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10) /* 10 MB per block */ #define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10) /* 10 MB per block */
typedef struct aofrwblock { typedef struct aofrwblock {
unsigned long used, free; unsigned long used, free, pos;
char buf[AOF_RW_BUF_BLOCK_SIZE]; char buf[AOF_RW_BUF_BLOCK_SIZE];
} aofrwblock; } aofrwblock;
@ -96,29 +96,31 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
listNode *ln; listNode *ln;
aofrwblock *block; aofrwblock *block;
ssize_t nwritten; ssize_t nwritten;
mstime_t latency;
UNUSED(el); UNUSED(el);
UNUSED(fd); UNUSED(fd);
UNUSED(privdata); UNUSED(privdata);
UNUSED(mask); UNUSED(mask);
latencyStartMonitor(latency);
while(1) { while(1) {
ln = listFirst(server.aof_rewrite_buf_blocks); ln = listFirst(server.aof_rewrite_buf_blocks);
block = ln ? ln->value : NULL; block = ln ? ln->value : NULL;
if (server.aof_stop_sending_diff || !block) { if (server.aof_stop_sending_diff || !block) {
aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child, aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
AE_WRITABLE); AE_WRITABLE);
return; break;
} }
if (block->used > 0) { if (block->used != block->pos) {
nwritten = write(server.aof_pipe_write_data_to_child, nwritten = write(server.aof_pipe_write_data_to_child,
block->buf,block->used); block->buf+block->pos,block->used-block->pos);
if (nwritten <= 0) return; if (nwritten <= 0) break;
memmove(block->buf,block->buf+nwritten,block->used-nwritten); block->pos += nwritten;
block->used -= nwritten;
block->free += nwritten;
} }
if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln); if (block->used == block->pos) listDelNode(server.aof_rewrite_buf_blocks,ln);
} }
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-rewrite-write-data-to-child",latency);
} }
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */ /* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
@ -146,6 +148,7 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
block = zmalloc(sizeof(*block)); block = zmalloc(sizeof(*block));
block->free = AOF_RW_BUF_BLOCK_SIZE; block->free = AOF_RW_BUF_BLOCK_SIZE;
block->used = 0; block->used = 0;
block->pos = 0;
listAddNodeTail(server.aof_rewrite_buf_blocks,block); listAddNodeTail(server.aof_rewrite_buf_blocks,block);
/* Log every time we cross more 10 or 100 blocks, respectively /* Log every time we cross more 10 or 100 blocks, respectively
@ -181,9 +184,9 @@ ssize_t aofRewriteBufferWrite(int fd) {
aofrwblock *block = listNodeValue(ln); aofrwblock *block = listNodeValue(ln);
ssize_t nwritten; ssize_t nwritten;
if (block->used) { if (block->used != block->pos) {
nwritten = write(fd,block->buf,block->used); nwritten = write(fd,block->buf+block->pos,block->used-block->pos);
if (nwritten != (ssize_t)block->used) { if (nwritten != (ssize_t)(block->used-block->pos)) {
if (nwritten == 0) errno = EIO; if (nwritten == 0) errno = EIO;
return -1; return -1;
} }