fsync the old aof file when open a new INCR AOF (#11004)

In rewriteAppendOnlyFileBackground, after flushAppendOnlyFile(1),
and before openNewIncrAofForAppend, we should call redis_fsync
to fsync the aof file.

Because we may open a new INCR AOF in openNewIncrAofForAppend,
in the case of using everysec policy, the old AOF file may not
be fsynced in time (or even at all).

When using everysec, we don't want to pay the disk latency from
the main thread, so we will do a background fsync.

Adding a argument for bioCreateCloseJob, a `need_fsync` flag to
indicate that a fsync is required before the file is closed. So we will
fsync the old AOF file before we close it.

A cleanup, we make union become a union, since the free_* args and
the fd / fsync args are never used together.

Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
Binbin 2022-07-25 14:16:35 +08:00 committed by GitHub
parent 39d216a326
commit 03fff10ab4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 49 additions and 23 deletions

View File

@ -49,6 +49,7 @@ int aofFileExist(char *filename);
int rewriteAppendOnlyFile(char *filename);
aofManifest *aofLoadManifestFromFile(sds am_filepath);
void aofManifestFreeAndUpdate(aofManifest *am);
void aof_background_fsync_and_close(int fd);
/* ----------------------------------------------------------------------------
* AOF Manifest file implementation.
@ -826,8 +827,14 @@ int openNewIncrAofForAppend(void) {
/* If reaches here, we can safely modify the `server.aof_manifest`
* and `server.aof_fd`. */
/* Close old aof_fd if needed. */
if (server.aof_fd != -1) bioCreateCloseJob(server.aof_fd);
/* fsync and close old aof_fd if needed. In fsync everysec it's ok to delay
* the fsync as long as we grantee it happens, and in fsync always the file
* is already synced at this point so fsync doesn't matter. */
if (server.aof_fd != -1) {
aof_background_fsync_and_close(server.aof_fd);
server.aof_fsync_offset = server.aof_current_size;
server.aof_last_fsync = server.unixtime;
}
server.aof_fd = newfd;
/* Reset the aof_last_incr_size. */
@ -904,6 +911,9 @@ int aofRewriteLimited(void) {
/* Return true if an AOf fsync is currently already in progress in a
* BIO thread. */
int aofFsyncInProgress(void) {
/* Note that we don't care about aof_background_fsync_and_close because
* server.aof_fd has been replaced by the new INCR AOF file fd,
* see openNewIncrAofForAppend. */
return bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
}
@ -913,6 +923,11 @@ void aof_background_fsync(int fd) {
bioCreateFsyncJob(fd);
}
/* Close the fd on the basis of aof_background_fsync. */
void aof_background_fsync_and_close(int fd) {
bioCreateCloseJob(fd, 1);
}
/* Kills an AOFRW child process if exists */
void killAppendOnlyChild(void) {
int statloc;

View File

@ -76,12 +76,19 @@ static unsigned long long bio_pending[BIO_NUM_OPS];
/* This structure represents a background Job. It is only used locally to this
* file as the API does not expose the internals at all. */
struct bio_job {
typedef union bio_job {
/* Job specific arguments.*/
int fd; /* Fd for file based background jobs */
lazy_free_fn *free_fn; /* Function that will free the provided arguments */
void *free_args[]; /* List of arguments to be passed to the free function */
};
struct {
int fd; /* Fd for file based background jobs */
unsigned need_fsync:1; /* A flag to indicate that a fsync is required before
* the file is closed. */
} fd_args;
struct {
lazy_free_fn *free_fn; /* Function that will free the provided arguments */
void *free_args[]; /* List of arguments to be passed to the free function */
} free_args;
} bio_job;
void *bioProcessBackgroundJobs(void *arg);
@ -125,7 +132,7 @@ void bioInit(void) {
}
}
void bioSubmitJob(int type, struct bio_job *job) {
void bioSubmitJob(int type, bio_job *job) {
pthread_mutex_lock(&bio_mutex[type]);
listAddNodeTail(bio_jobs[type],job);
bio_pending[type]++;
@ -137,33 +144,34 @@ void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) {
va_list valist;
/* Allocate memory for the job structure and all required
* arguments */
struct bio_job *job = zmalloc(sizeof(*job) + sizeof(void *) * (arg_count));
job->free_fn = free_fn;
bio_job *job = zmalloc(sizeof(*job) + sizeof(void *) * (arg_count));
job->free_args.free_fn = free_fn;
va_start(valist, arg_count);
for (int i = 0; i < arg_count; i++) {
job->free_args[i] = va_arg(valist, void *);
job->free_args.free_args[i] = va_arg(valist, void *);
}
va_end(valist);
bioSubmitJob(BIO_LAZY_FREE, job);
}
void bioCreateCloseJob(int fd) {
struct bio_job *job = zmalloc(sizeof(*job));
job->fd = fd;
void bioCreateCloseJob(int fd, int need_fsync) {
bio_job *job = zmalloc(sizeof(*job));
job->fd_args.fd = fd;
job->fd_args.need_fsync = need_fsync;
bioSubmitJob(BIO_CLOSE_FILE, job);
}
void bioCreateFsyncJob(int fd) {
struct bio_job *job = zmalloc(sizeof(*job));
job->fd = fd;
bio_job *job = zmalloc(sizeof(*job));
job->fd_args.fd = fd;
bioSubmitJob(BIO_AOF_FSYNC, job);
}
void *bioProcessBackgroundJobs(void *arg) {
struct bio_job *job;
bio_job *job;
unsigned long type = (unsigned long) arg;
sigset_t sigset;
@ -216,12 +224,15 @@ void *bioProcessBackgroundJobs(void *arg) {
/* Process the job accordingly to its type. */
if (type == BIO_CLOSE_FILE) {
close(job->fd);
if (job->fd_args.need_fsync) {
redis_fsync(job->fd_args.fd);
}
close(job->fd_args.fd);
} else if (type == BIO_AOF_FSYNC) {
/* The fd may be closed by main thread and reused for another
* socket, pipe, or file. We just ignore these errno because
* aof fsync did not really fail. */
if (redis_fsync(job->fd) == -1 &&
if (redis_fsync(job->fd_args.fd) == -1 &&
errno != EBADF && errno != EINVAL)
{
int last_status;
@ -236,7 +247,7 @@ void *bioProcessBackgroundJobs(void *arg) {
atomicSet(server.aof_bio_fsync_status,C_OK);
}
} else if (type == BIO_LAZY_FREE) {
job->free_fn(job->free_args);
job->free_args.free_fn(job->free_args.free_args);
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}

View File

@ -37,7 +37,7 @@ void bioInit(void);
unsigned long long bioPendingJobsOfType(int type);
unsigned long long bioWaitStepOfType(int type);
void bioKillThreads(void);
void bioCreateCloseJob(int fd);
void bioCreateCloseJob(int fd, int need_fsync);
void bioCreateFsyncJob(int fd);
void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...);

View File

@ -103,7 +103,7 @@ int bg_unlink(const char *filename) {
errno = old_errno;
return -1;
}
bioCreateCloseJob(fd);
bioCreateCloseJob(fd, 0);
return 0; /* Success. */
}
}
@ -2149,7 +2149,7 @@ void readSyncBulkPayload(connection *conn) {
return;
}
/* Close old rdb asynchronously. */
if (old_rdb_fd != -1) bioCreateCloseJob(old_rdb_fd);
if (old_rdb_fd != -1) bioCreateCloseJob(old_rdb_fd, 0);
/* Sync the directory to ensure rename is persisted */
if (fsyncFileDir(server.rdb_filename) == -1) {