diff --git a/src/aof.c b/src/aof.c index 150404bc3..482ec786d 100644 --- a/src/aof.c +++ b/src/aof.c @@ -49,6 +49,7 @@ int aofFileExist(char *filename); int rewriteAppendOnlyFile(char *filename); aofManifest *aofLoadManifestFromFile(sds am_filepath); void aofManifestFreeAndUpdate(aofManifest *am); +void aof_background_fsync_and_close(int fd); /* ---------------------------------------------------------------------------- * AOF Manifest file implementation. @@ -826,8 +827,14 @@ int openNewIncrAofForAppend(void) { /* If reaches here, we can safely modify the `server.aof_manifest` * and `server.aof_fd`. */ - /* Close old aof_fd if needed. */ - if (server.aof_fd != -1) bioCreateCloseJob(server.aof_fd); + /* fsync and close old aof_fd if needed. In fsync everysec it's ok to delay + * the fsync as long as we grantee it happens, and in fsync always the file + * is already synced at this point so fsync doesn't matter. */ + if (server.aof_fd != -1) { + aof_background_fsync_and_close(server.aof_fd); + server.aof_fsync_offset = server.aof_current_size; + server.aof_last_fsync = server.unixtime; + } server.aof_fd = newfd; /* Reset the aof_last_incr_size. */ @@ -904,6 +911,9 @@ int aofRewriteLimited(void) { /* Return true if an AOf fsync is currently already in progress in a * BIO thread. */ int aofFsyncInProgress(void) { + /* Note that we don't care about aof_background_fsync_and_close because + * server.aof_fd has been replaced by the new INCR AOF file fd, + * see openNewIncrAofForAppend. */ return bioPendingJobsOfType(BIO_AOF_FSYNC) != 0; } @@ -913,6 +923,11 @@ void aof_background_fsync(int fd) { bioCreateFsyncJob(fd); } +/* Close the fd on the basis of aof_background_fsync. */ +void aof_background_fsync_and_close(int fd) { + bioCreateCloseJob(fd, 1); +} + /* Kills an AOFRW child process if exists */ void killAppendOnlyChild(void) { int statloc; diff --git a/src/bio.c b/src/bio.c index f02389019..9242e51ed 100644 --- a/src/bio.c +++ b/src/bio.c @@ -76,12 +76,19 @@ static unsigned long long bio_pending[BIO_NUM_OPS]; /* This structure represents a background Job. It is only used locally to this * file as the API does not expose the internals at all. */ -struct bio_job { +typedef union bio_job { /* Job specific arguments.*/ - int fd; /* Fd for file based background jobs */ - lazy_free_fn *free_fn; /* Function that will free the provided arguments */ - void *free_args[]; /* List of arguments to be passed to the free function */ -}; + struct { + int fd; /* Fd for file based background jobs */ + unsigned need_fsync:1; /* A flag to indicate that a fsync is required before + * the file is closed. */ + } fd_args; + + struct { + lazy_free_fn *free_fn; /* Function that will free the provided arguments */ + void *free_args[]; /* List of arguments to be passed to the free function */ + } free_args; +} bio_job; void *bioProcessBackgroundJobs(void *arg); @@ -125,7 +132,7 @@ void bioInit(void) { } } -void bioSubmitJob(int type, struct bio_job *job) { +void bioSubmitJob(int type, bio_job *job) { pthread_mutex_lock(&bio_mutex[type]); listAddNodeTail(bio_jobs[type],job); bio_pending[type]++; @@ -137,33 +144,34 @@ void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) { va_list valist; /* Allocate memory for the job structure and all required * arguments */ - struct bio_job *job = zmalloc(sizeof(*job) + sizeof(void *) * (arg_count)); - job->free_fn = free_fn; + bio_job *job = zmalloc(sizeof(*job) + sizeof(void *) * (arg_count)); + job->free_args.free_fn = free_fn; va_start(valist, arg_count); for (int i = 0; i < arg_count; i++) { - job->free_args[i] = va_arg(valist, void *); + job->free_args.free_args[i] = va_arg(valist, void *); } va_end(valist); bioSubmitJob(BIO_LAZY_FREE, job); } -void bioCreateCloseJob(int fd) { - struct bio_job *job = zmalloc(sizeof(*job)); - job->fd = fd; +void bioCreateCloseJob(int fd, int need_fsync) { + bio_job *job = zmalloc(sizeof(*job)); + job->fd_args.fd = fd; + job->fd_args.need_fsync = need_fsync; bioSubmitJob(BIO_CLOSE_FILE, job); } void bioCreateFsyncJob(int fd) { - struct bio_job *job = zmalloc(sizeof(*job)); - job->fd = fd; + bio_job *job = zmalloc(sizeof(*job)); + job->fd_args.fd = fd; bioSubmitJob(BIO_AOF_FSYNC, job); } void *bioProcessBackgroundJobs(void *arg) { - struct bio_job *job; + bio_job *job; unsigned long type = (unsigned long) arg; sigset_t sigset; @@ -216,12 +224,15 @@ void *bioProcessBackgroundJobs(void *arg) { /* Process the job accordingly to its type. */ if (type == BIO_CLOSE_FILE) { - close(job->fd); + if (job->fd_args.need_fsync) { + redis_fsync(job->fd_args.fd); + } + close(job->fd_args.fd); } else if (type == BIO_AOF_FSYNC) { /* The fd may be closed by main thread and reused for another * socket, pipe, or file. We just ignore these errno because * aof fsync did not really fail. */ - if (redis_fsync(job->fd) == -1 && + if (redis_fsync(job->fd_args.fd) == -1 && errno != EBADF && errno != EINVAL) { int last_status; @@ -236,7 +247,7 @@ void *bioProcessBackgroundJobs(void *arg) { atomicSet(server.aof_bio_fsync_status,C_OK); } } else if (type == BIO_LAZY_FREE) { - job->free_fn(job->free_args); + job->free_args.free_fn(job->free_args.free_args); } else { serverPanic("Wrong job type in bioProcessBackgroundJobs()."); } diff --git a/src/bio.h b/src/bio.h index 9eff6d16e..4a4c98ad0 100644 --- a/src/bio.h +++ b/src/bio.h @@ -37,7 +37,7 @@ void bioInit(void); unsigned long long bioPendingJobsOfType(int type); unsigned long long bioWaitStepOfType(int type); void bioKillThreads(void); -void bioCreateCloseJob(int fd); +void bioCreateCloseJob(int fd, int need_fsync); void bioCreateFsyncJob(int fd); void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...); diff --git a/src/replication.c b/src/replication.c index b929b0460..cf053779d 100644 --- a/src/replication.c +++ b/src/replication.c @@ -103,7 +103,7 @@ int bg_unlink(const char *filename) { errno = old_errno; return -1; } - bioCreateCloseJob(fd); + bioCreateCloseJob(fd, 0); return 0; /* Success. */ } } @@ -2149,7 +2149,7 @@ void readSyncBulkPayload(connection *conn) { return; } /* Close old rdb asynchronously. */ - if (old_rdb_fd != -1) bioCreateCloseJob(old_rdb_fd); + if (old_rdb_fd != -1) bioCreateCloseJob(old_rdb_fd, 0); /* Sync the directory to ensure rename is persisted */ if (fsyncFileDir(server.rdb_filename) == -1) {