Merge branch 'work.aio' of git://git.kernel.org/pub/scm/linux/kernel/git/viro/vfs

Pull aio race fixes and cleanups from Al Viro.

The aio code had more issues with error handling and races with the aio
completing at just the right (wrong) time along with freeing the file
descriptor when another thread closes the file.

Just a couple of these commits are the actual fixes: the others are
cleanups to either make the fixes simpler, or to make the code legible
and understandable enough that we hope there's no more fundamental races
hiding.

* 'work.aio' of git://git.kernel.org/pub/scm/linux/kernel/git/viro/vfs:
  aio: move sanity checks and request allocation to io_submit_one()
  deal with get_reqs_available() in aio_get_req() itself
  aio: move dropping ->ki_eventfd into iocb_destroy()
  make aio_read()/aio_write() return int
  Fix aio_poll() races
  aio: store event at final iocb_put()
  aio: keep io_event in aio_kiocb
  aio: fold lookup_kiocb() into its sole caller
  pin iocb through aio.
This commit is contained in:
Linus Torvalds 2019-04-01 08:28:36 -07:00
commit 5e7a8ca319
1 changed files with 151 additions and 189 deletions

340
fs/aio.c
View File

@ -181,7 +181,7 @@ struct poll_iocb {
struct file *file;
struct wait_queue_head *head;
__poll_t events;
bool woken;
bool done;
bool cancelled;
struct wait_queue_entry wait;
struct work_struct work;
@ -204,8 +204,7 @@ struct aio_kiocb {
struct kioctx *ki_ctx;
kiocb_cancel_fn *ki_cancel;
struct iocb __user *ki_user_iocb; /* user's aiocb */
__u64 ki_user_data; /* user's data for completion */
struct io_event ki_res;
struct list_head ki_list; /* the aio core uses this
* for cancellation */
@ -1022,6 +1021,9 @@ static bool get_reqs_available(struct kioctx *ctx)
/* aio_get_req
* Allocate a slot for an aio request.
* Returns NULL if no requests are free.
*
* The refcount is initialized to 2 - one for the async op completion,
* one for the synchronous code that does this.
*/
static inline struct aio_kiocb *aio_get_req(struct kioctx *ctx)
{
@ -1031,10 +1033,15 @@ static inline struct aio_kiocb *aio_get_req(struct kioctx *ctx)
if (unlikely(!req))
return NULL;
if (unlikely(!get_reqs_available(ctx))) {
kfree(req);
return NULL;
}
percpu_ref_get(&ctx->reqs);
req->ki_ctx = ctx;
INIT_LIST_HEAD(&req->ki_list);
refcount_set(&req->ki_refcnt, 0);
refcount_set(&req->ki_refcnt, 2);
req->ki_eventfd = NULL;
return req;
}
@ -1067,30 +1074,20 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id)
return ret;
}
static inline void iocb_put(struct aio_kiocb *iocb)
static inline void iocb_destroy(struct aio_kiocb *iocb)
{
if (refcount_read(&iocb->ki_refcnt) == 0 ||
refcount_dec_and_test(&iocb->ki_refcnt)) {
if (iocb->ki_filp)
fput(iocb->ki_filp);
percpu_ref_put(&iocb->ki_ctx->reqs);
kmem_cache_free(kiocb_cachep, iocb);
}
}
static void aio_fill_event(struct io_event *ev, struct aio_kiocb *iocb,
long res, long res2)
{
ev->obj = (u64)(unsigned long)iocb->ki_user_iocb;
ev->data = iocb->ki_user_data;
ev->res = res;
ev->res2 = res2;
if (iocb->ki_eventfd)
eventfd_ctx_put(iocb->ki_eventfd);
if (iocb->ki_filp)
fput(iocb->ki_filp);
percpu_ref_put(&iocb->ki_ctx->reqs);
kmem_cache_free(kiocb_cachep, iocb);
}
/* aio_complete
* Called when the io request on the given iocb is complete.
*/
static void aio_complete(struct aio_kiocb *iocb, long res, long res2)
static void aio_complete(struct aio_kiocb *iocb)
{
struct kioctx *ctx = iocb->ki_ctx;
struct aio_ring *ring;
@ -1114,14 +1111,14 @@ static void aio_complete(struct aio_kiocb *iocb, long res, long res2)
ev_page = kmap_atomic(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
event = ev_page + pos % AIO_EVENTS_PER_PAGE;
aio_fill_event(event, iocb, res, res2);
*event = iocb->ki_res;
kunmap_atomic(ev_page);
flush_dcache_page(ctx->ring_pages[pos / AIO_EVENTS_PER_PAGE]);
pr_debug("%p[%u]: %p: %p %Lx %lx %lx\n",
ctx, tail, iocb, iocb->ki_user_iocb, iocb->ki_user_data,
res, res2);
pr_debug("%p[%u]: %p: %p %Lx %Lx %Lx\n", ctx, tail, iocb,
(void __user *)(unsigned long)iocb->ki_res.obj,
iocb->ki_res.data, iocb->ki_res.res, iocb->ki_res.res2);
/* after flagging the request as done, we
* must never even look at it again
@ -1148,10 +1145,8 @@ static void aio_complete(struct aio_kiocb *iocb, long res, long res2)
* eventfd. The eventfd_signal() function is safe to be called
* from IRQ context.
*/
if (iocb->ki_eventfd) {
if (iocb->ki_eventfd)
eventfd_signal(iocb->ki_eventfd, 1);
eventfd_ctx_put(iocb->ki_eventfd);
}
/*
* We have to order our ring_info tail store above and test
@ -1163,7 +1158,14 @@ static void aio_complete(struct aio_kiocb *iocb, long res, long res2)
if (waitqueue_active(&ctx->wait))
wake_up(&ctx->wait);
iocb_put(iocb);
}
static inline void iocb_put(struct aio_kiocb *iocb)
{
if (refcount_dec_and_test(&iocb->ki_refcnt)) {
aio_complete(iocb);
iocb_destroy(iocb);
}
}
/* aio_read_events_ring
@ -1437,7 +1439,9 @@ static void aio_complete_rw(struct kiocb *kiocb, long res, long res2)
file_end_write(kiocb->ki_filp);
}
aio_complete(iocb, res, res2);
iocb->ki_res.res = res;
iocb->ki_res.res2 = res2;
iocb_put(iocb);
}
static int aio_prep_rw(struct kiocb *req, const struct iocb *iocb)
@ -1514,13 +1518,13 @@ static inline void aio_rw_done(struct kiocb *req, ssize_t ret)
}
}
static ssize_t aio_read(struct kiocb *req, const struct iocb *iocb,
static int aio_read(struct kiocb *req, const struct iocb *iocb,
bool vectored, bool compat)
{
struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
struct iov_iter iter;
struct file *file;
ssize_t ret;
int ret;
ret = aio_prep_rw(req, iocb);
if (ret)
@ -1542,13 +1546,13 @@ static ssize_t aio_read(struct kiocb *req, const struct iocb *iocb,
return ret;
}
static ssize_t aio_write(struct kiocb *req, const struct iocb *iocb,
static int aio_write(struct kiocb *req, const struct iocb *iocb,
bool vectored, bool compat)
{
struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
struct iov_iter iter;
struct file *file;
ssize_t ret;
int ret;
ret = aio_prep_rw(req, iocb);
if (ret)
@ -1585,11 +1589,10 @@ static ssize_t aio_write(struct kiocb *req, const struct iocb *iocb,
static void aio_fsync_work(struct work_struct *work)
{
struct fsync_iocb *req = container_of(work, struct fsync_iocb, work);
int ret;
struct aio_kiocb *iocb = container_of(work, struct aio_kiocb, fsync.work);
ret = vfs_fsync(req->file, req->datasync);
aio_complete(container_of(req, struct aio_kiocb, fsync), ret, 0);
iocb->ki_res.res = vfs_fsync(iocb->fsync.file, iocb->fsync.datasync);
iocb_put(iocb);
}
static int aio_fsync(struct fsync_iocb *req, const struct iocb *iocb,
@ -1608,11 +1611,6 @@ static int aio_fsync(struct fsync_iocb *req, const struct iocb *iocb,
return 0;
}
static inline void aio_poll_complete(struct aio_kiocb *iocb, __poll_t mask)
{
aio_complete(iocb, mangle_poll(mask), 0);
}
static void aio_poll_complete_work(struct work_struct *work)
{
struct poll_iocb *req = container_of(work, struct poll_iocb, work);
@ -1638,9 +1636,11 @@ static void aio_poll_complete_work(struct work_struct *work)
return;
}
list_del_init(&iocb->ki_list);
iocb->ki_res.res = mangle_poll(mask);
req->done = true;
spin_unlock_irq(&ctx->ctx_lock);
aio_poll_complete(iocb, mask);
iocb_put(iocb);
}
/* assumes we are called with irqs disabled */
@ -1668,31 +1668,27 @@ static int aio_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
__poll_t mask = key_to_poll(key);
unsigned long flags;
req->woken = true;
/* for instances that support it check for an event match first: */
if (mask) {
if (!(mask & req->events))
return 0;
if (mask && !(mask & req->events))
return 0;
list_del_init(&req->wait.entry);
if (mask && spin_trylock_irqsave(&iocb->ki_ctx->ctx_lock, flags)) {
/*
* Try to complete the iocb inline if we can. Use
* irqsave/irqrestore because not all filesystems (e.g. fuse)
* call this function with IRQs disabled and because IRQs
* have to be disabled before ctx_lock is obtained.
*/
if (spin_trylock_irqsave(&iocb->ki_ctx->ctx_lock, flags)) {
list_del(&iocb->ki_list);
spin_unlock_irqrestore(&iocb->ki_ctx->ctx_lock, flags);
list_del_init(&req->wait.entry);
aio_poll_complete(iocb, mask);
return 1;
}
list_del(&iocb->ki_list);
iocb->ki_res.res = mangle_poll(mask);
req->done = true;
spin_unlock_irqrestore(&iocb->ki_ctx->ctx_lock, flags);
iocb_put(iocb);
} else {
schedule_work(&req->work);
}
list_del_init(&req->wait.entry);
schedule_work(&req->work);
return 1;
}
@ -1719,11 +1715,12 @@ aio_poll_queue_proc(struct file *file, struct wait_queue_head *head,
add_wait_queue(head, &pt->iocb->poll.wait);
}
static ssize_t aio_poll(struct aio_kiocb *aiocb, const struct iocb *iocb)
static int aio_poll(struct aio_kiocb *aiocb, const struct iocb *iocb)
{
struct kioctx *ctx = aiocb->ki_ctx;
struct poll_iocb *req = &aiocb->poll;
struct aio_poll_table apt;
bool cancel = false;
__poll_t mask;
/* reject any unknown events outside the normal event mask. */
@ -1737,7 +1734,7 @@ static ssize_t aio_poll(struct aio_kiocb *aiocb, const struct iocb *iocb)
req->events = demangle_poll(iocb->aio_buf) | EPOLLERR | EPOLLHUP;
req->head = NULL;
req->woken = false;
req->done = false;
req->cancelled = false;
apt.pt._qproc = aio_poll_queue_proc;
@ -1749,156 +1746,135 @@ static ssize_t aio_poll(struct aio_kiocb *aiocb, const struct iocb *iocb)
INIT_LIST_HEAD(&req->wait.entry);
init_waitqueue_func_entry(&req->wait, aio_poll_wake);
/* one for removal from waitqueue, one for this function */
refcount_set(&aiocb->ki_refcnt, 2);
mask = vfs_poll(req->file, &apt.pt) & req->events;
if (unlikely(!req->head)) {
/* we did not manage to set up a waitqueue, done */
goto out;
}
spin_lock_irq(&ctx->ctx_lock);
spin_lock(&req->head->lock);
if (req->woken) {
/* wake_up context handles the rest */
mask = 0;
apt.error = 0;
} else if (mask || apt.error) {
/* if we get an error or a mask we are done */
WARN_ON_ONCE(list_empty(&req->wait.entry));
list_del_init(&req->wait.entry);
} else {
/* actually waiting for an event */
list_add_tail(&aiocb->ki_list, &ctx->active_reqs);
aiocb->ki_cancel = aio_poll_cancel;
if (likely(req->head)) {
spin_lock(&req->head->lock);
if (unlikely(list_empty(&req->wait.entry))) {
if (apt.error)
cancel = true;
apt.error = 0;
mask = 0;
}
if (mask || apt.error) {
list_del_init(&req->wait.entry);
} else if (cancel) {
WRITE_ONCE(req->cancelled, true);
} else if (!req->done) { /* actually waiting for an event */
list_add_tail(&aiocb->ki_list, &ctx->active_reqs);
aiocb->ki_cancel = aio_poll_cancel;
}
spin_unlock(&req->head->lock);
}
if (mask) { /* no async, we'd stolen it */
aiocb->ki_res.res = mangle_poll(mask);
apt.error = 0;
}
spin_unlock(&req->head->lock);
spin_unlock_irq(&ctx->ctx_lock);
out:
if (unlikely(apt.error))
return apt.error;
if (mask)
aio_poll_complete(aiocb, mask);
iocb_put(aiocb);
return 0;
iocb_put(aiocb);
return apt.error;
}
static int __io_submit_one(struct kioctx *ctx, const struct iocb *iocb,
struct iocb __user *user_iocb, bool compat)
struct iocb __user *user_iocb, struct aio_kiocb *req,
bool compat)
{
struct aio_kiocb *req;
ssize_t ret;
/* enforce forwards compatibility on users */
if (unlikely(iocb->aio_reserved2)) {
pr_debug("EINVAL: reserve field set\n");
return -EINVAL;
}
/* prevent overflows */
if (unlikely(
(iocb->aio_buf != (unsigned long)iocb->aio_buf) ||
(iocb->aio_nbytes != (size_t)iocb->aio_nbytes) ||
((ssize_t)iocb->aio_nbytes < 0)
)) {
pr_debug("EINVAL: overflow check\n");
return -EINVAL;
}
if (!get_reqs_available(ctx))
return -EAGAIN;
ret = -EAGAIN;
req = aio_get_req(ctx);
if (unlikely(!req))
goto out_put_reqs_available;
req->ki_filp = fget(iocb->aio_fildes);
ret = -EBADF;
if (unlikely(!req->ki_filp))
goto out_put_req;
return -EBADF;
if (iocb->aio_flags & IOCB_FLAG_RESFD) {
struct eventfd_ctx *eventfd;
/*
* If the IOCB_FLAG_RESFD flag of aio_flags is set, get an
* instance of the file* now. The file descriptor must be
* an eventfd() fd, and will be signaled for each completed
* event using the eventfd_signal() function.
*/
req->ki_eventfd = eventfd_ctx_fdget((int) iocb->aio_resfd);
if (IS_ERR(req->ki_eventfd)) {
ret = PTR_ERR(req->ki_eventfd);
req->ki_eventfd = NULL;
goto out_put_req;
}
eventfd = eventfd_ctx_fdget(iocb->aio_resfd);
if (IS_ERR(eventfd))
return PTR_ERR(req->ki_eventfd);
req->ki_eventfd = eventfd;
}
ret = put_user(KIOCB_KEY, &user_iocb->aio_key);
if (unlikely(ret)) {
if (unlikely(put_user(KIOCB_KEY, &user_iocb->aio_key))) {
pr_debug("EFAULT: aio_key\n");
goto out_put_req;
return -EFAULT;
}
req->ki_user_iocb = user_iocb;
req->ki_user_data = iocb->aio_data;
req->ki_res.obj = (u64)(unsigned long)user_iocb;
req->ki_res.data = iocb->aio_data;
req->ki_res.res = 0;
req->ki_res.res2 = 0;
switch (iocb->aio_lio_opcode) {
case IOCB_CMD_PREAD:
ret = aio_read(&req->rw, iocb, false, compat);
break;
return aio_read(&req->rw, iocb, false, compat);
case IOCB_CMD_PWRITE:
ret = aio_write(&req->rw, iocb, false, compat);
break;
return aio_write(&req->rw, iocb, false, compat);
case IOCB_CMD_PREADV:
ret = aio_read(&req->rw, iocb, true, compat);
break;
return aio_read(&req->rw, iocb, true, compat);
case IOCB_CMD_PWRITEV:
ret = aio_write(&req->rw, iocb, true, compat);
break;
return aio_write(&req->rw, iocb, true, compat);
case IOCB_CMD_FSYNC:
ret = aio_fsync(&req->fsync, iocb, false);
break;
return aio_fsync(&req->fsync, iocb, false);
case IOCB_CMD_FDSYNC:
ret = aio_fsync(&req->fsync, iocb, true);
break;
return aio_fsync(&req->fsync, iocb, true);
case IOCB_CMD_POLL:
ret = aio_poll(req, iocb);
break;
return aio_poll(req, iocb);
default:
pr_debug("invalid aio operation %d\n", iocb->aio_lio_opcode);
ret = -EINVAL;
break;
return -EINVAL;
}
/*
* If ret is 0, we'd either done aio_complete() ourselves or have
* arranged for that to be done asynchronously. Anything non-zero
* means that we need to destroy req ourselves.
*/
if (ret)
goto out_put_req;
return 0;
out_put_req:
if (req->ki_eventfd)
eventfd_ctx_put(req->ki_eventfd);
iocb_put(req);
out_put_reqs_available:
put_reqs_available(ctx, 1);
return ret;
}
static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
bool compat)
{
struct aio_kiocb *req;
struct iocb iocb;
int err;
if (unlikely(copy_from_user(&iocb, user_iocb, sizeof(iocb))))
return -EFAULT;
return __io_submit_one(ctx, &iocb, user_iocb, compat);
/* enforce forwards compatibility on users */
if (unlikely(iocb.aio_reserved2)) {
pr_debug("EINVAL: reserve field set\n");
return -EINVAL;
}
/* prevent overflows */
if (unlikely(
(iocb.aio_buf != (unsigned long)iocb.aio_buf) ||
(iocb.aio_nbytes != (size_t)iocb.aio_nbytes) ||
((ssize_t)iocb.aio_nbytes < 0)
)) {
pr_debug("EINVAL: overflow check\n");
return -EINVAL;
}
req = aio_get_req(ctx);
if (unlikely(!req))
return -EAGAIN;
err = __io_submit_one(ctx, &iocb, user_iocb, req, compat);
/* Done with the synchronous reference */
iocb_put(req);
/*
* If err is 0, we'd either done aio_complete() ourselves or have
* arranged for that to be done asynchronously. Anything non-zero
* means that we need to destroy req ourselves.
*/
if (unlikely(err)) {
iocb_destroy(req);
put_reqs_available(ctx, 1);
}
return err;
}
/* sys_io_submit:
@ -1997,24 +1973,6 @@ COMPAT_SYSCALL_DEFINE3(io_submit, compat_aio_context_t, ctx_id,
}
#endif
/* lookup_kiocb
* Finds a given iocb for cancellation.
*/
static struct aio_kiocb *
lookup_kiocb(struct kioctx *ctx, struct iocb __user *iocb)
{
struct aio_kiocb *kiocb;
assert_spin_locked(&ctx->ctx_lock);
/* TODO: use a hash or array, this sucks. */
list_for_each_entry(kiocb, &ctx->active_reqs, ki_list) {
if (kiocb->ki_user_iocb == iocb)
return kiocb;
}
return NULL;
}
/* sys_io_cancel:
* Attempts to cancel an iocb previously passed to io_submit. If
* the operation is successfully cancelled, the resulting event is
@ -2032,6 +1990,7 @@ SYSCALL_DEFINE3(io_cancel, aio_context_t, ctx_id, struct iocb __user *, iocb,
struct aio_kiocb *kiocb;
int ret = -EINVAL;
u32 key;
u64 obj = (u64)(unsigned long)iocb;
if (unlikely(get_user(key, &iocb->aio_key)))
return -EFAULT;
@ -2043,10 +2002,13 @@ SYSCALL_DEFINE3(io_cancel, aio_context_t, ctx_id, struct iocb __user *, iocb,
return -EINVAL;
spin_lock_irq(&ctx->ctx_lock);
kiocb = lookup_kiocb(ctx, iocb);
if (kiocb) {
ret = kiocb->ki_cancel(&kiocb->rw);
list_del_init(&kiocb->ki_list);
/* TODO: use a hash or array, this sucks. */
list_for_each_entry(kiocb, &ctx->active_reqs, ki_list) {
if (kiocb->ki_res.obj == obj) {
ret = kiocb->ki_cancel(&kiocb->rw);
list_del_init(&kiocb->ki_list);
break;
}
}
spin_unlock_irq(&ctx->ctx_lock);