diff options
author | Linus Torvalds | 2022-03-21 16:24:45 -0700 |
---|---|---|
committer | Linus Torvalds | 2022-03-21 16:24:45 -0700 |
commit | af472a9efdf65cbb3398cb6478ec0e89fbc84109 (patch) | |
tree | 11ec956e35851d6b579cbab5c555f70598adc52c /fs | |
parent | 93e220a62da36f766b3188e76e234607e41488f9 (diff) | |
parent | 5e929367468c8f97cd1ffb0417316cecfebef94b (diff) |
Merge tag 'for-5.18/io_uring-2022-03-18' of git://git.kernel.dk/linux-block
Pull io_uring updates from Jens Axboe:
- Fixes for current file position. Still doesn't have the f_pos_lock
sorted, but it's a step in the right direction (Dylan)
- Tracing updates (Dylan, Stefan)
- Improvements to io-wq locking (Hao)
- Improvements for provided buffers (me, Pavel)
- Support for registered file descriptors (me, Xiaoguang)
- Support for ring messages (me)
- Poll improvements (me)
- Fix for fixed buffers and non-iterator reads/writes (me)
- Support for NAPI on sockets (Olivier)
- Ring quiesce improvements (Usama)
- Misc fixes (Olivier, Pavel)
* tag 'for-5.18/io_uring-2022-03-18' of git://git.kernel.dk/linux-block: (42 commits)
io_uring: terminate manual loop iterator loop correctly for non-vecs
io_uring: don't check unrelated req->open.how in accept request
io_uring: manage provided buffers strictly ordered
io_uring: fold evfd signalling under a slower path
io_uring: thin down io_commit_cqring()
io_uring: shuffle io_eventfd_signal() bits around
io_uring: remove extra barrier for non-sqpoll iopoll
io_uring: fix provided buffer return on failure for kiocb_done()
io_uring: extend provided buf return to fails
io_uring: refactor timeout cancellation cqe posting
io_uring: normilise naming for fill_cqe*
io_uring: cache poll/double-poll state with a request flag
io_uring: cache req->apoll->events in req->cflags
io_uring: move req->poll_refs into previous struct hole
io_uring: make tracing format consistent
io_uring: recycle apoll_poll entries
io_uring: remove duplicated member check for io_msg_ring_prep()
io_uring: allow submissions to continue on error
io_uring: recycle provided buffers if request goes async
io_uring: ensure reads re-import for selected buffers
...
Diffstat (limited to 'fs')
-rw-r--r-- | fs/io-wq.c | 114 | ||||
-rw-r--r-- | fs/io_uring.c | 1251 |
2 files changed, 1024 insertions, 341 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c index bb7f161bb19c..5b93fa67d346 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -76,6 +76,7 @@ struct io_wqe_acct { unsigned max_workers; int index; atomic_t nr_running; + raw_spinlock_t lock; struct io_wq_work_list work_list; unsigned long flags; }; @@ -91,7 +92,7 @@ enum { */ struct io_wqe { raw_spinlock_t lock; - struct io_wqe_acct acct[2]; + struct io_wqe_acct acct[IO_WQ_ACCT_NR]; int node; @@ -224,12 +225,12 @@ static void io_worker_exit(struct io_worker *worker) if (worker->flags & IO_WORKER_F_FREE) hlist_nulls_del_rcu(&worker->nulls_node); list_del_rcu(&worker->all_list); - preempt_disable(); + raw_spin_unlock(&wqe->lock); io_wqe_dec_running(worker); worker->flags = 0; + preempt_disable(); current->flags &= ~PF_IO_WORKER; preempt_enable(); - raw_spin_unlock(&wqe->lock); kfree_rcu(worker, rcu); io_worker_ref_put(wqe->wq); @@ -238,10 +239,15 @@ static void io_worker_exit(struct io_worker *worker) static inline bool io_acct_run_queue(struct io_wqe_acct *acct) { + bool ret = false; + + raw_spin_lock(&acct->lock); if (!wq_list_empty(&acct->work_list) && !test_bit(IO_ACCT_STALLED_BIT, &acct->flags)) - return true; - return false; + ret = true; + raw_spin_unlock(&acct->lock); + + return ret; } /* @@ -385,7 +391,6 @@ fail: } static void io_wqe_dec_running(struct io_worker *worker) - __must_hold(wqe->lock) { struct io_wqe_acct *acct = io_wqe_get_acct(worker); struct io_wqe *wqe = worker->wqe; @@ -393,13 +398,14 @@ static void io_wqe_dec_running(struct io_worker *worker) if (!(worker->flags & IO_WORKER_F_UP)) return; - if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) { - atomic_inc(&acct->nr_running); - atomic_inc(&wqe->wq->worker_refs); - raw_spin_unlock(&wqe->lock); - io_queue_worker_create(worker, acct, create_worker_cb); - raw_spin_lock(&wqe->lock); - } + if (!atomic_dec_and_test(&acct->nr_running)) + return; + if (!io_acct_run_queue(acct)) + return; + + atomic_inc(&acct->nr_running); + atomic_inc(&wqe->wq->worker_refs); + io_queue_worker_create(worker, acct, create_worker_cb); } /* @@ -407,11 +413,12 @@ static void io_wqe_dec_running(struct io_worker *worker) * it's currently on the freelist */ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker) - __must_hold(wqe->lock) { if (worker->flags & IO_WORKER_F_FREE) { worker->flags &= ~IO_WORKER_F_FREE; + raw_spin_lock(&wqe->lock); hlist_nulls_del_init_rcu(&worker->nulls_node); + raw_spin_unlock(&wqe->lock); } } @@ -456,7 +463,7 @@ static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash) static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct, struct io_worker *worker) - __must_hold(wqe->lock) + __must_hold(acct->lock) { struct io_wq_work_node *node, *prev; struct io_wq_work *work, *tail; @@ -498,9 +505,9 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct, * work being added and clearing the stalled bit. */ set_bit(IO_ACCT_STALLED_BIT, &acct->flags); - raw_spin_unlock(&wqe->lock); + raw_spin_unlock(&acct->lock); unstalled = io_wait_on_hash(wqe, stall_hash); - raw_spin_lock(&wqe->lock); + raw_spin_lock(&acct->lock); if (unstalled) { clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); if (wq_has_sleeper(&wqe->wq->hash->wait)) @@ -538,7 +545,6 @@ static void io_assign_current_work(struct io_worker *worker, static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work); static void io_worker_handle_work(struct io_worker *worker) - __releases(wqe->lock) { struct io_wqe_acct *acct = io_wqe_get_acct(worker); struct io_wqe *wqe = worker->wqe; @@ -555,7 +561,9 @@ static void io_worker_handle_work(struct io_worker *worker) * can't make progress, any work completion or insertion will * clear the stalled flag. */ + raw_spin_lock(&acct->lock); work = io_get_next_work(acct, worker); + raw_spin_unlock(&acct->lock); if (work) { __io_worker_busy(wqe, worker); @@ -569,10 +577,9 @@ static void io_worker_handle_work(struct io_worker *worker) raw_spin_lock(&worker->lock); worker->next_work = work; raw_spin_unlock(&worker->lock); - } - raw_spin_unlock(&wqe->lock); - if (!work) + } else { break; + } io_assign_current_work(worker, work); __set_current_state(TASK_RUNNING); @@ -608,8 +615,6 @@ static void io_worker_handle_work(struct io_worker *worker) wake_up(&wq->hash->wait); } } while (work); - - raw_spin_lock(&wqe->lock); } while (1); } @@ -633,12 +638,10 @@ static int io_wqe_worker(void *data) long ret; set_current_state(TASK_INTERRUPTIBLE); -loop: - raw_spin_lock(&wqe->lock); - if (io_acct_run_queue(acct)) { + while (io_acct_run_queue(acct)) io_worker_handle_work(worker); - goto loop; - } + + raw_spin_lock(&wqe->lock); /* timed out, exit unless we're the last worker */ if (last_timeout && acct->nr_workers > 1) { acct->nr_workers--; @@ -662,10 +665,8 @@ loop: last_timeout = !ret; } - if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) { - raw_spin_lock(&wqe->lock); + if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) io_worker_handle_work(worker); - } audit_free(current); io_worker_exit(worker); @@ -705,10 +706,7 @@ void io_wq_worker_sleeping(struct task_struct *tsk) return; worker->flags &= ~IO_WORKER_F_RUNNING; - - raw_spin_lock(&worker->wqe->lock); io_wqe_dec_running(worker); - raw_spin_unlock(&worker->wqe->lock); } static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker, @@ -778,10 +776,12 @@ static void create_worker_cont(struct callback_head *cb) .cancel_all = true, }; + raw_spin_unlock(&wqe->lock); while (io_acct_cancel_pending_work(wqe, acct, &match)) - raw_spin_lock(&wqe->lock); + ; + } else { + raw_spin_unlock(&wqe->lock); } - raw_spin_unlock(&wqe->lock); io_worker_ref_put(wqe->wq); kfree(worker); return; @@ -914,6 +914,7 @@ static bool io_wq_work_match_item(struct io_wq_work *work, void *data) static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) { struct io_wqe_acct *acct = io_work_get_acct(wqe, work); + struct io_cb_cancel_data match; unsigned work_flags = work->flags; bool do_create; @@ -927,10 +928,12 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) return; } - raw_spin_lock(&wqe->lock); + raw_spin_lock(&acct->lock); io_wqe_insert_work(wqe, work); clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); + raw_spin_unlock(&acct->lock); + raw_spin_lock(&wqe->lock); rcu_read_lock(); do_create = !io_wqe_activate_free_worker(wqe, acct); rcu_read_unlock(); @@ -946,18 +949,18 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) return; raw_spin_lock(&wqe->lock); - /* fatal condition, failed to create the first worker */ - if (!acct->nr_workers) { - struct io_cb_cancel_data match = { - .fn = io_wq_work_match_item, - .data = work, - .cancel_all = false, - }; - - if (io_acct_cancel_pending_work(wqe, acct, &match)) - raw_spin_lock(&wqe->lock); + if (acct->nr_workers) { + raw_spin_unlock(&wqe->lock); + return; } raw_spin_unlock(&wqe->lock); + + /* fatal condition, failed to create the first worker */ + match.fn = io_wq_work_match_item, + match.data = work, + match.cancel_all = false, + + io_acct_cancel_pending_work(wqe, acct, &match); } } @@ -1032,22 +1035,23 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe, static bool io_acct_cancel_pending_work(struct io_wqe *wqe, struct io_wqe_acct *acct, struct io_cb_cancel_data *match) - __releases(wqe->lock) { struct io_wq_work_node *node, *prev; struct io_wq_work *work; + raw_spin_lock(&acct->lock); wq_list_for_each(node, prev, &acct->work_list) { work = container_of(node, struct io_wq_work, list); if (!match->fn(work, match->data)) continue; io_wqe_remove_pending(wqe, work, prev); - raw_spin_unlock(&wqe->lock); + raw_spin_unlock(&acct->lock); io_run_cancel(work, wqe); match->nr_pending++; /* not safe to continue after unlock */ return true; } + raw_spin_unlock(&acct->lock); return false; } @@ -1061,7 +1065,6 @@ retry: struct io_wqe_acct *acct = io_get_acct(wqe, i == 0); if (io_acct_cancel_pending_work(wqe, acct, match)) { - raw_spin_lock(&wqe->lock); if (match->cancel_all) goto retry; break; @@ -1103,13 +1106,11 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, for_each_node(node) { struct io_wqe *wqe = wq->wqes[node]; - raw_spin_lock(&wqe->lock); io_wqe_cancel_pending_work(wqe, &match); - if (match.nr_pending && !match.cancel_all) { - raw_spin_unlock(&wqe->lock); + if (match.nr_pending && !match.cancel_all) return IO_WQ_CANCEL_OK; - } + raw_spin_lock(&wqe->lock); io_wqe_cancel_running_work(wqe, &match); raw_spin_unlock(&wqe->lock); if (match.nr_running && !match.cancel_all) @@ -1190,6 +1191,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) acct->index = i; atomic_set(&acct->nr_running, 0); INIT_WQ_LIST(&acct->work_list); + raw_spin_lock_init(&acct->lock); } wqe->wq = wq; raw_spin_lock_init(&wqe->lock); @@ -1282,9 +1284,7 @@ static void io_wq_destroy(struct io_wq *wq) .fn = io_wq_work_match_all, .cancel_all = true, }; - raw_spin_lock(&wqe->lock); io_wqe_cancel_pending_work(wqe, &match); - raw_spin_unlock(&wqe->lock); free_cpumask_var(wqe->cpu_mask); kfree(wqe); } @@ -1376,7 +1376,7 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count) BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND); BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2); - for (i = 0; i < 2; i++) { + for (i = 0; i < IO_WQ_ACCT_NR; i++) { if (new_count[i] > task_rlimit(current, RLIMIT_NPROC)) new_count[i] = task_rlimit(current, RLIMIT_NPROC); } diff --git a/fs/io_uring.c b/fs/io_uring.c index 4715980e9015..5fa736344b67 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -63,6 +63,7 @@ #include <net/sock.h> #include <net/af_unix.h> #include <net/scm.h> +#include <net/busy_poll.h> #include <linux/anon_inodes.h> #include <linux/sched/mm.h> #include <linux/uaccess.h> @@ -263,11 +264,18 @@ struct io_rsrc_data { bool quiesce; }; +struct io_buffer_list { + struct list_head list; + struct list_head buf_list; + __u16 bgid; +}; + struct io_buffer { struct list_head list; __u64 addr; __u32 len; __u16 bid; + __u16 bgid; }; struct io_restriction { @@ -326,6 +334,14 @@ struct io_submit_state { struct blk_plug plug; }; +struct io_ev_fd { + struct eventfd_ctx *cq_ev_fd; + unsigned int eventfd_async: 1; + struct rcu_head rcu; +}; + +#define IO_BUFFERS_HASH_BITS 5 + struct io_ring_ctx { /* const or read-mostly hot data */ struct { @@ -335,11 +351,11 @@ struct io_ring_ctx { unsigned int flags; unsigned int compat: 1; unsigned int drain_next: 1; - unsigned int eventfd_async: 1; unsigned int restricted: 1; unsigned int off_timeout_used: 1; unsigned int drain_active: 1; unsigned int drain_disabled: 1; + unsigned int has_evfd: 1; } ____cacheline_aligned_in_smp; /* submission data */ @@ -378,7 +394,9 @@ struct io_ring_ctx { struct list_head timeout_list; struct list_head ltimeout_list; struct list_head cq_overflow_list; - struct xarray io_buffers; + struct list_head *io_buffers; + struct list_head io_buffers_cache; + struct list_head apoll_cache; struct xarray personalities; u32 pers_next; unsigned sq_thread_idle; @@ -395,11 +413,16 @@ struct io_ring_ctx { struct list_head sqd_list; unsigned long check_cq_overflow; +#ifdef CONFIG_NET_RX_BUSY_POLL + /* used to track busy poll napi_id */ + struct list_head napi_list; + spinlock_t napi_lock; /* napi_list lock */ +#endif struct { unsigned cached_cq_tail; unsigned cq_entries; - struct eventfd_ctx *cq_ev_fd; + struct io_ev_fd __rcu *io_ev_fd; struct wait_queue_head cq_wait; unsigned cq_extra; atomic_t cq_timeouts; @@ -421,6 +444,8 @@ struct io_ring_ctx { struct hlist_head *cancel_hash; unsigned cancel_hash_bits; bool poll_multi_queue; + + struct list_head io_buffers_comp; } ____cacheline_aligned_in_smp; struct io_restriction restrictions; @@ -436,6 +461,8 @@ struct io_ring_ctx { struct llist_head rsrc_put_llist; struct list_head rsrc_ref_list; spinlock_t rsrc_ref_lock; + + struct list_head io_buffers_pages; }; /* Keep this last, we don't need it for the fast path */ @@ -461,6 +488,11 @@ struct io_ring_ctx { }; }; +/* + * Arbitrary limit, can be raised if need be + */ +#define IO_RINGFD_REG_MAX 16 + struct io_uring_task { /* submission side */ int cached_refs; @@ -476,6 +508,7 @@ struct io_uring_task { struct io_wq_work_list task_list; struct io_wq_work_list prior_task_list; struct callback_head task_work; + struct file **registered_rings; bool task_running; }; @@ -690,6 +723,12 @@ struct io_hardlink { int flags; }; +struct io_msg { + struct file *file; + u64 user_data; + u32 len; +}; + struct io_async_connect { struct sockaddr_storage address; }; @@ -741,6 +780,8 @@ enum { REQ_F_ARM_LTIMEOUT_BIT, REQ_F_ASYNC_DATA_BIT, REQ_F_SKIP_LINK_CQES_BIT, + REQ_F_SINGLE_POLL_BIT, + REQ_F_DOUBLE_POLL_BIT, /* keep async read/write and isreg together and in order */ REQ_F_SUPPORT_NOWAIT_BIT, REQ_F_ISREG_BIT, @@ -799,6 +840,10 @@ enum { REQ_F_ASYNC_DATA = BIT(REQ_F_ASYNC_DATA_BIT), /* don't post CQEs while failing linked requests */ REQ_F_SKIP_LINK_CQES = BIT(REQ_F_SKIP_LINK_CQES_BIT), + /* single poll may be active */ + REQ_F_SINGLE_POLL = BIT(REQ_F_SINGLE_POLL_BIT), + /* double poll may active */ + REQ_F_DOUBLE_POLL = BIT(REQ_F_DOUBLE_POLL_BIT), }; struct async_poll { @@ -825,7 +870,7 @@ enum { * NOTE! Each of the iocb union members has the file pointer * as the first entry in their struct definition. So you can * access the file pointer through any of the sub-structs, - * or directly as just 'ki_filp' in this struct. + * or directly as just 'file' in this struct. */ struct io_kiocb { union { @@ -855,6 +900,7 @@ struct io_kiocb { struct io_mkdir mkdir; struct io_symlink symlink; struct io_hardlink hardlink; + struct io_msg msg; }; u8 opcode; @@ -877,6 +923,7 @@ struct io_kiocb { /* used by request caches, completion batching and iopoll */ struct io_wq_work_node comp_list; atomic_t refs; + atomic_t poll_refs; struct io_kiocb *link; struct io_task_work io_task_work; /* for polled requests, i.e. IORING_OP_POLL_ADD and async armed poll */ @@ -885,12 +932,11 @@ struct io_kiocb { struct async_poll *apoll; /* opcode allocated if it needs to store data for async defer */ void *async_data; - struct io_wq_work work; /* custom credentials, valid IFF REQ_F_CREDS is set */ - const struct cred *creds; /* stores selected buf, valid IFF REQ_F_BUFFER_SELECTED is set */ struct io_buffer *kbuf; - atomic_t poll_refs; + const struct cred *creds; + struct io_wq_work work; }; struct io_tctx_node { @@ -1105,6 +1151,9 @@ static const struct io_op_def io_op_defs[] = { [IORING_OP_MKDIRAT] = {}, [IORING_OP_SYMLINKAT] = {}, [IORING_OP_LINKAT] = {}, + [IORING_OP_MSG_RING] = { + .needs_file = 1, + }, }; /* requests with any of those set should undergo io_disarm_next() */ @@ -1141,6 +1190,7 @@ static int io_install_fixed_file(struct io_kiocb *req, struct file *file, static int io_close_fixed(struct io_kiocb *req, unsigned int issue_flags); static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer); +static void io_eventfd_signal(struct io_ring_ctx *ctx); static struct kmem_cache *req_cachep; @@ -1267,36 +1317,88 @@ static inline void io_req_set_rsrc_node(struct io_kiocb *req, } } -static unsigned int __io_put_kbuf(struct io_kiocb *req) +static unsigned int __io_put_kbuf(struct io_kiocb *req, struct list_head *list) { struct io_buffer *kbuf = req->kbuf; unsigned int cflags; - cflags = kbuf->bid << IORING_CQE_BUFFER_SHIFT; - cflags |= IORING_CQE_F_BUFFER; + cflags = IORING_CQE_F_BUFFER | (kbuf->bid << IORING_CQE_BUFFER_SHIFT); req->flags &= ~REQ_F_BUFFER_SELECTED; - kfree(kbuf); + list_add(&kbuf->list, list); req->kbuf = NULL; return cflags; } -static inline unsigned int io_put_kbuf(struct io_kiocb *req) +static inline unsigned int io_put_kbuf_comp(struct io_kiocb *req) +{ + if (likely(!(req->flags & REQ_F_BUFFER_SELECTED))) + return 0; + return __io_put_kbuf(req, &req->ctx->io_buffers_comp); +} + +static inline unsigned int io_put_kbuf(struct io_kiocb *req, + unsigned issue_flags) { + unsigned int cflags; + if (likely(!(req->flags & REQ_F_BUFFER_SELECTED))) return 0; - return __io_put_kbuf(req); + + /* + * We can add this buffer back to two lists: + * + * 1) The io_buffers_cache list. This one is protected by the + * ctx->uring_lock. If we already hold this lock, add back to this + * list as we can grab it from issue as well. + * 2) The io_buffers_comp list. This one is protected by the + * ctx->completion_lock. + * + * We migrate buffers from the comp_list to the issue cache list + * when we need one. + */ + if (issue_flags & IO_URING_F_UNLOCKED) { + struct io_ring_ctx *ctx = req->ctx; + + spin_lock(&ctx->completion_lock); + cflags = __io_put_kbuf(req, &ctx->io_buffers_comp); + spin_unlock(&ctx->completion_lock); + } else { + cflags = __io_put_kbuf(req, &req->ctx->io_buffers_cache); + } + + return cflags; } -static void io_refs_resurrect(struct percpu_ref *ref, struct completion *compl) +static struct io_buffer_list *io_buffer_get_list(struct io_ring_ctx *ctx, + unsigned int bgid) { - bool got = percpu_ref_tryget(ref); + struct list_head *hash_list; + struct io_buffer_list *bl; - /* already at zero, wait for ->release() */ - if (!got) - wait_for_completion(compl); - percpu_ref_resurrect(ref); - if (got) - percpu_ref_put(ref); + hash_list = &ctx->io_buffers[hash_32(bgid, IO_BUFFERS_HASH_BITS)]; + list_for_each_entry(bl, hash_list, list) + if (bl->bgid == bgid || bgid == -1U) + return bl; + + return NULL; +} + +static void io_kbuf_recycle(struct io_kiocb *req) +{ + struct io_ring_ctx *ctx = req->ctx; + struct io_buffer_list *bl; + struct io_buffer *buf; + + if (likely(!(req->flags & REQ_F_BUFFER_SELECTED))) + return; + + lockdep_assert_held(&ctx->uring_lock); + + buf = req->kbuf; + bl = io_buffer_get_list(ctx, buf->bgid); + list_add(&buf->list, &bl->buf_list); + req->flags &= ~REQ_F_BUFFER_SELECTED; + req->kbuf = NULL; } static bool io_match_task(struct io_kiocb *head, struct task_struct *task, @@ -1409,7 +1511,7 @@ static __cold void io_fallback_req_func(struct work_struct *work) static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) { struct io_ring_ctx *ctx; - int hash_bits; + int i, hash_bits; ctx = kzalloc(sizeof(*ctx), GFP_KERNEL); if (!ctx) @@ -1436,6 +1538,13 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) /* set invalid range, so io_import_fixed() fails meeting it */ ctx->dummy_ubuf->ubuf = -1UL; + ctx->io_buffers = kcalloc(1U << IO_BUFFERS_HASH_BITS, + sizeof(struct list_head), GFP_KERNEL); + if (!ctx->io_buffers) + goto err; + for (i = 0; i < (1U << IO_BUFFERS_HASH_BITS); i++) + INIT_LIST_HEAD(&ctx->io_buffers[i]); + if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free, PERCPU_REF_ALLOW_REINIT, GFP_KERNEL)) goto err; @@ -1444,14 +1553,17 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) init_waitqueue_head(&ctx->sqo_sq_wait); INIT_LIST_HEAD(&ctx->sqd_list); INIT_LIST_HEAD(&ctx->cq_overflow_list); + INIT_LIST_HEAD(&ctx->io_buffers_cache); + INIT_LIST_HEAD(&ctx->apoll_cache); init_completion(&ctx->ref_comp); - xa_init_flags(&ctx->io_buffers, XA_FLAGS_ALLOC1); xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1); mutex_init(&ctx->uring_lock); init_waitqueue_head(&ctx->cq_wait); spin_lock_init(&ctx->completion_lock); spin_lock_init(&ctx->timeout_lock); INIT_WQ_LIST(&ctx->iopoll_list); + INIT_LIST_HEAD(&ctx->io_buffers_pages); + INIT_LIST_HEAD(&ctx->io_buffers_comp); INIT_LIST_HEAD(&ctx->defer_list); INIT_LIST_HEAD(&ctx->timeout_list); INIT_LIST_HEAD(&ctx->ltimeout_list); @@ -1464,10 +1576,15 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) INIT_WQ_LIST(&ctx->locked_free_list); INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func); INIT_WQ_LIST(&ctx->submit_state.compl_reqs); +#ifdef CONFIG_NET_RX_BUSY_POLL + INIT_LIST_HEAD(&ctx->napi_list); + spin_lock_init(&ctx->napi_lock); +#endif return ctx; err: kfree(ctx->dummy_ubuf); kfree(ctx->cancel_hash); + kfree(ctx->io_buffers); kfree(ctx); return NULL; } @@ -1610,8 +1727,8 @@ static void io_queue_async_work(struct io_kiocb *req, bool *dont_use) if (WARN_ON_ONCE(!same_thread_group(req->task, current))) req->work.flags |= IO_WQ_WORK_CANCEL; - trace_io_uring_queue_async_work(ctx, io_wq_is_hashed(&req->work), req, - &req->work, req->flags); + trace_io_uring_queue_async_work(ctx, req, req->user_data, req->opcode, req->flags, + &req->work, io_wq_is_hashed(&req->work)); io_wq_enqueue(tctx->io_wq, &req->work); if (link) io_queue_linked_timeout(link); @@ -1681,22 +1798,27 @@ static __cold void io_flush_timeouts(struct io_ring_ctx *ctx) spin_unlock_irq(&ctx->timeout_lock); } -static __cold void __io_commit_cqring_flush(struct io_ring_ctx *ctx) -{ - if (ctx->off_timeout_used) - io_flush_timeouts(ctx); - if (ctx->drain_active) - io_queue_deferred(ctx); -} - static inline void io_commit_cqring(struct io_ring_ctx *ctx) { - if (unlikely(ctx->off_timeout_used || ctx->drain_active)) - __io_commit_cqring_flush(ctx); /* order cqe stores with ring update */ smp_store_release(&ctx->rings->cq.tail, ctx->cached_cq_tail); } +static void __io_commit_cqring_flush(struct io_ring_ctx *ctx) +{ + if (ctx->off_timeout_used || ctx->drain_active) { + spin_lock(&ctx->completion_lock); + if (ctx->off_timeout_used) + io_flush_timeouts(ctx); + if (ctx->drain_active) + io_queue_deferred(ctx); + io_commit_cqring(ctx); + spin_unlock(&ctx->completion_lock); + } + if (ctx->has_evfd) + io_eventfd_signal(ctx); +} + static inline bool io_sqring_full(struct io_ring_ctx *ctx) { struct io_rings *r = ctx->rings; @@ -1726,23 +1848,34 @@ static inline struct io_uring_cqe *io_get_cqe(struct io_ring_ctx *ctx) return &rings->cqes[tail & mask]; } -static inline bool io_should_trigger_evfd(struct io_ring_ctx *ctx) +static void io_eventfd_signal(struct io_ring_ctx *ctx) { - if (likely(!ctx->cq_ev_fd)) - return false; + struct io_ev_fd *ev_fd; + + rcu_read_lock(); + /* + * rcu_dereference ctx->io_ev_fd once and use it for both for checking + * and eventfd_signal + */ + ev_fd = rcu_dereference(ctx->io_ev_fd); + + /* + * Check again if ev_fd exists incase an io_eventfd_unregister call + * completed between the NULL check of ctx->io_ev_fd at the start of + * the function and rcu_read_lock. + */ + if (unlikely(!ev_fd)) + goto out; if (READ_ONCE(ctx->rings->cq_flags) & IORING_CQ_EVENTFD_DISABLED) - return false; - return !ctx->eventfd_async || io_wq_current_is_worker(); + goto out; + + if (!ev_fd->eventfd_async || io_wq_current_is_worker()) + eventfd_signal(ev_fd->cq_ev_fd, 1); +out: + rcu_read_unlock(); } -/* - * This should only get called when at least one event has been posted. - * Some applications rely on the eventfd notification count only changing - * IFF a new CQE has been added to the CQ ring. There's no depedency on - * 1:1 relationship between how many times this function is called (and - * hence the eventfd count) and number of CQEs posted to the CQ ring. - */ -static void io_cqring_ev_posted(struct io_ring_ctx *ctx) +static inline void io_cqring_wake(struct io_ring_ctx *ctx) { /* * wake_up_all() may seem excessive, but io_wake_function() and @@ -1751,21 +1884,32 @@ static void io_cqring_ev_posted(struct io_ring_ctx *ctx) */ if (wq_has_sleeper(&ctx->cq_wait)) wake_up_all(&ctx->cq_wait); - if (io_should_trigger_evfd(ctx)) - eventfd_signal(ctx->cq_ev_fd, 1); +} + +/* + * This should only get called when at least one event has been posted. + * Some applications rely on the eventfd notification count only changing + * IFF a new CQE has been added to the CQ ring. There's no depedency on + * 1:1 relationship between how many times this function is called (and + * hence the eventfd count) and number of CQEs posted to the CQ ring. + */ +static inline void io_cqring_ev_posted(struct io_ring_ctx *ctx) +{ + if (unlikely(ctx->off_timeout_used || ctx->drain_active || + ctx->has_evfd)) + __io_commit_cqring_flush(ctx); + + io_cqring_wake(ctx); } static void io_cqring_ev_posted_iopoll(struct io_ring_ctx *ctx) { - /* see waitqueue_active() comment */ - smp_mb(); + if (unlikely(ctx->off_timeout_used || ctx->drain_active || + ctx->has_evfd)) + __io_commit_cqring_flush(ctx); - if (ctx->flags & IORING_SETUP_SQPOLL) { - if (waitqueue_active(&ctx->cq_wait)) - wake_up_all(&ctx->cq_wait); - } - if (io_should_trigger_evfd(ctx)) - eventfd_signal(ctx->cq_ev_fd, 1); + if (ctx->flags & IORING_SETUP_SQPOLL) + io_cqring_wake(ctx); } /* Returns true if there are no backlogged entries after the flush */ @@ -1905,8 +2049,6 @@ static inline bool __io_fill_cqe(struct io_ring_ctx *ctx, u64 user_data, { struct io_uring_cqe *cqe; - trace_io_uring_complete(ctx, user_data, res, cflags); - /* * If we can't get a cq entry, userspace overflowed the * submission (by quite a lot). Increment the overflow count in @@ -1922,16 +2064,23 @@ static inline bool __io_fill_cqe(struct io_ring_ctx *ctx, u64 user_data, return io_cqring_event_overflow(ctx, user_data, res, cflags); } +static inline bool __io_fill_cqe_req(struct io_kiocb *req, s32 res, u32 cflags) +{ + trace_io_uring_complete(req->ctx, req, req->user_data, res, cflags); + return __io_fill_cqe(req->ctx, req->user_data, res, cflags); +} + static noinline void io_fill_cqe_req(struct io_kiocb *req, s32 res, u32 cflags) { if (!(req->flags & REQ_F_CQE_SKIP)) - __io_fill_cqe(req->ctx, req->user_data, res, cflags); + __io_fill_cqe_req(req, res, cflags); } static noinline bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags) { ctx->cq_extra++; + trace_io_uring_complete(ctx, NULL, user_data, res, cflags); return __io_fill_cqe(ctx, user_data, res, cflags); } @@ -1941,7 +2090,7 @@ static void __io_req_complete_post(struct io_kiocb *req, s32 res, struct io_ring_ctx *ctx = req->ctx; if (!(req->flags & REQ_F_CQE_SKIP)) - __io_fill_cqe(ctx, req->user_data, res, cflags); + __io_fill_cqe_req(req, res, cflags); /* * If we're the last reference to this request, add to our locked * free_list cache. @@ -2000,7 +2149,7 @@ static inline void io_req_complete(struct io_kiocb *req, s32 res) static void io_req_complete_failed(struct io_kiocb *req, s32 res) { req_set_fail(req); - io_req_complete_post(req, res, 0); + io_req_complete_post(req, res, io_put_kbuf(req, 0)); } static void io_req_complete_fail_submit(struct io_kiocb *req) @@ -2183,7 +2332,9 @@ static void io_fail_links(struct io_kiocb *req) nxt = link->link; link->link = NULL; - trace_io_uring_fail_link(req, link); + trace_io_uring_fail_link(req->ctx, req, req->user_data, + req->opcode, link); + if (!ignore_cqes) { link->flags &= ~REQ_F_CQE_SKIP; io_fill_cqe_req(link, res, 0); @@ -2302,7 +2453,8 @@ static void handle_prev_tw_list(struct io_wq_work_node *node, if (likely(*uring_locked)) req->io_task_work.func(req, uring_locked); else - __io_req_complete_post(req, req->result, io_put_kbuf(req)); + __io_req_complete_post(req, req->result, + io_put_kbuf_comp(req)); node = next; } while (node); @@ -2530,8 +2682,16 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx) comp_list); if (!(req->flags & REQ_F_CQE_SKIP)) - __io_fill_cqe(ctx, req->user_data, req->result, - req->cflags); + __io_fill_cqe_req(req, req->result, req->cflags); + if ((req->flags & REQ_F_POLLED) && req->apoll) { + struct async_poll *apoll = req->apoll; + + if (apoll->double_poll) + kfree(apoll->double_poll); + list_add(&apoll->poll.wait.entry, + &ctx->apoll_cache); + req->flags &= ~REQ_F_POLLED; + } } io_commit_cqring(ctx); @@ -2653,7 +2813,7 @@ static int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin) if (unlikely(req->flags & REQ_F_CQE_SKIP)) continue; - __io_fill_cqe(ctx, req->user_data, req->result, io_put_kbuf(req)); + __io_fill_cqe_req(req, req->result, io_put_kbuf(req, 0)); nr_events++; } @@ -2829,14 +2989,14 @@ static bool __io_complete_rw_common(struct io_kiocb *req, long res) static inline void io_req_task_complete(struct io_kiocb *req, bool *locked) { - unsigned int cflags = io_put_kbuf(req); int res = req->result; if (*locked) { - io_req_complete_state(req, res, cflags); + io_req_complete_state(req, res, io_put_kbuf(req, 0)); io_req_add_compl_list(req); } else { - io_req_complete_post(req, res, cflags); + io_req_complete_post(req, res, + io_put_kbuf(req, IO_URING_F_UNLOCKED)); } } @@ -2845,7 +3005,8 @@ static void __io_complete_rw(struct io_kiocb *req, long res, { if (__io_complete_rw_common(req, res)) return; - __io_req_complete(req, issue_flags, req->result, io_put_kbuf(req)); + __io_req_complete(req, issue_flags, req->result, + io_put_kbuf(req, issue_flags)); } static void io_complete_rw(struct kiocb *kiocb, long res) @@ -3000,14 +3161,6 @@ static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe) req->flags |= io_file_get_flags(file) << REQ_F_SUPPORT_NOWAIT_BIT; kiocb->ki_pos = READ_ONCE(sqe->off); - if (kiocb->ki_pos == -1) { - if (!(file->f_mode & FMODE_STREAM)) { - req->flags |= REQ_F_CUR_POS; - kiocb->ki_pos = file->f_pos; - } else { - kiocb->ki_pos = 0; - } - } kiocb->ki_flags = iocb_flags(file); ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags)); if (unlikely(ret)) @@ -3074,6 +3227,24 @@ static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret) } } +static inline loff_t *io_kiocb_update_pos(struct io_kiocb *req) +{ + struct kiocb *kiocb = &req->rw.kiocb; + bool is_stream = req->file->f_mode & FMODE_STREAM; + + if (kiocb->ki_pos == -1) { + if (!is_stream) { + req->flags |= REQ_F_CUR_POS; + kiocb->ki_pos = req->file->f_pos; + return &kiocb->ki_pos; + } else { + kiocb->ki_pos = 0; + return NULL; + } + } + return is_stream ? NULL : &kiocb->ki_pos; +} + static void kiocb_done(struct io_kiocb *req, ssize_t ret, unsigned int issue_flags) { @@ -3096,14 +3267,10 @@ static void kiocb_done(struct io_kiocb *req, ssize_t ret, if (req->flags & REQ_F_REISSUE) { req->flags &= ~REQ_F_REISSUE; - if (io_resubmit_prep(req)) { + if (io_resubmit_prep(req)) io_req_task_queue_reissue(req); - } else { - req_set_fail(req); - req->result = ret; - req->io_task_work.func = io_req_task_complete; - io_req_task_work_add(req, false); - } + else + io_req_task_queue_fail(req, ret); } } @@ -3201,30 +3368,36 @@ static void io_ring_submit_lock(struct io_ring_ctx *ctx, bool needs_lock) mutex_lock(&ctx->uring_lock); } +static void io_buffer_add_list(struct io_ring_ctx *ctx, + struct io_buffer_list *bl, unsigned int bgid) +{ + struct list_head *list; + + list = &ctx->io_buffers[hash_32(bgid, IO_BUFFERS_HASH_BITS)]; + INIT_LIST_HEAD(&bl->buf_list); + bl->bgid = bgid; + list_add(&bl->list, list); +} + static struct io_buffer *io_buffer_select(struct io_kiocb *req, size_t *len, int bgid, unsigned int issue_flags) { struct io_buffer *kbuf = req->kbuf; - struct io_buffer *head; bool needs_lock = issue_flags & IO_URING_F_UNLOCKED; + struct io_ring_ctx *ctx = req->ctx; + struct io_buffer_list *bl; if (req->flags & REQ_F_BUFFER_SELECTED) return kbuf; - io_ring_submit_lock(req->ctx, needs_lock); + io_ring_submit_lock(ctx, needs_lock); - lockdep_assert_held(&req->ctx->uring_lock); + lockdep_assert_held(&ctx->uring_lock); - head = xa_load(&req->ctx->io_buffers, bgid); - if (head) { - if (!list_empty(&head->list)) { - kbuf = list_last_entry(&head->list, struct io_buffer, - list); - list_del(&kbuf->list); - } else { - kbuf = head; - xa_erase(&req->ctx->io_buffers, bgid); - } + bl = io_buffer_get_list(ctx, bgid); + if (bl && !list_empty(&bl->buf_list)) { + kbuf = list_first_entry(&bl->buf_list, struct io_buffer, list); + list_del(&kbuf->list); if (*len > kbuf->len) *len = kbuf->len; req->flags |= REQ_F_BUFFER_SELECTED; @@ -3400,6 +3573,7 @@ static ssize_t loop_rw_iter(int rw, struct io_kiocb *req, struct iov_iter *iter) struct kiocb *kiocb = &req->rw.kiocb; struct file *file = req->file; ssize_t ret = 0; + loff_t *ppos; /* * Don't support polled IO through this interface, and we can't @@ -3412,6 +3586,8 @@ static ssize_t loop_rw_iter(int rw, struct io_kiocb *req, struct iov_iter *iter) !(kiocb->ki_filp->f_flags & O_NONBLOCK)) return -EAGAIN; + ppos = io_kiocb_ppos(kiocb); + while (iov_iter_count(iter)) { struct iovec iovec; ssize_t nr; @@ -3425,10 +3601,10 @@ static ssize_t loop_rw_iter(int rw, struct io_kiocb *req, struct iov_iter *iter) if (rw == READ) { nr = file->f_op->read(file, iovec.iov_base, - iovec.iov_len, io_kiocb_ppos(kiocb)); + iovec.iov_len, ppos); } else { nr = file->f_op->write(file, iovec.iov_base, - iovec.iov_len, io_kiocb_ppos(kiocb)); + iovec.iov_len, ppos); } if (nr < 0) { @@ -3436,13 +3612,15 @@ static ssize_t loop_rw_iter(int rw, struct io_kiocb *req, struct iov_iter *iter) ret = nr; break; } + ret += nr; if (!iov_iter_is_bvec(iter)) { iov_iter_advance(iter, nr); } else { - req->rw.len -= nr; req->rw.addr += nr; + req->rw.len -= nr; + if (!req->rw.len) + break; } - ret += nr; if (nr != iovec.iov_len) break; } @@ -3629,12 +3807,23 @@ static int io_read(struct io_kiocb *req, unsigned int issue_flags) bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK; struct io_async_rw *rw; ssize_t ret, ret2; + loff_t *ppos; if (!req_has_async_data(req)) { ret = io_import_iovec(READ, req, &iovec, s, issue_flags); if (unlikely(ret < 0)) return ret; } else { + /* + * Safe and required to re-import if we're using provided + * buffers, as we dropped the selected one before retry. + */ + if (req->flags & REQ_F_BUFFER_SELECT) { + ret = io_import_iovec(READ, req, &iovec, s, issue_flags); + if (unlikely(ret < 0)) + return ret; + } + rw = req->async_data; s = &rw->s; /* @@ -3659,7 +3848,9 @@ static int io_read(struct io_kiocb *req, unsigned int issue_flags) kiocb->ki_flags &= ~IOCB_NOWAIT; } - ret = rw_verify_area(READ, req->file, io_kiocb_ppos(kiocb), req->result); + ppos = io_kiocb_update_pos(req); + + ret = rw_verify_area(READ, req->file, ppos, req->result); if (unlikely(ret)) { kfree(iovec); return ret; @@ -3669,6 +3860,9 @@ static int io_read(struct io_kiocb *req, unsigned int issue_flags) if (ret == -EAGAIN || (req->flags & REQ_F_REISSUE)) { req->flags &= ~REQ_F_REISSUE; + /* if we can poll, just do that */ + if (req->opcode == IORING_OP_READ && file_can_poll(req->file)) + return -EAGAIN; /* IOPOLL retry should happen for io-wq threads */ if (!force_nonblock && !(req->ctx->flags & IORING_SETUP_IOPOLL)) goto done; @@ -3758,6 +3952,7 @@ static int io_write(struct io_kiocb *req, unsigned int issue_flags) struct kiocb *kiocb = &req->rw.kiocb; bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK; ssize_t ret, ret2; + loff_t *ppos; if (!req_has_async_data(req)) { ret = io_import_iovec(WRITE, req, &iovec, s, issue_flags); @@ -3788,7 +3983,9 @@ static int io_write(struct io_kiocb *req, unsigned int issue_flags) kiocb->ki_flags &= ~IOCB_NOWAIT; } - ret = rw_verify_area(WRITE, req->file, io_kiocb_ppos(kiocb), req->result); + ppos = io_kiocb_update_pos(req); + + ret = rw_verify_area(WRITE, req->file, ppos, req->result); if (unlikely(ret)) goto out_free; @@ -4235,6 +4432,45 @@ static int io_nop(struct io_kiocb *req, unsigned int issue_flags) return 0; } +static int io_msg_ring_prep(struct io_kiocb *req, + const struct io_uring_sqe *sqe) +{ + if (unlikely(sqe->addr || sqe->ioprio || sqe->rw_flags || + sqe->splice_fd_in || sqe->buf_index || sqe->personality)) + return -EINVAL; + + if (req->file->f_op != &io_uring_fops) + return -EBADFD; + + req->msg.user_data = READ_ONCE(sqe->off); + req->msg.len = READ_ONCE(sqe->len); + return 0; +} + +static int io_msg_ring(struct io_kiocb *req, unsigned int issue_flags) +{ + struct io_ring_ctx *target_ctx; + struct io_msg *msg = &req->msg; + int ret = -EOVERFLOW; + bool filled; + + target_ctx = req->file->private_data; + + spin_lock(&target_ctx->completion_lock); + filled = io_fill_cqe_aux(target_ctx, msg->user_data, msg->len, + IORING_CQE_F_MSG); + io_commit_cqring(target_ctx); + spin_unlock(&target_ctx->completion_lock); + + if (filled) { + io_cqring_ev_posted(target_ctx); + ret = 0; + } + + __io_req_complete(req, issue_flags, ret, 0); + return 0; +} + static int io_fsync_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) { struct io_ring_ctx *ctx = req->ctx; @@ -4458,8 +4694,8 @@ static int io_remove_buffers_prep(struct io_kiocb *req, return 0; } -static int __io_remove_buffers(struct io_ring_ctx *ctx, struct io_buffer *buf, - int bgid, unsigned nbufs) +static int __io_remove_buffers(struct io_ring_ctx *ctx, + struct io_buffer_list *bl, unsigned nbufs) { unsigned i = 0; @@ -4468,19 +4704,16 @@ static int __io_remove_buffers(struct io_ring_ctx *ctx, struct io_buffer *buf, return 0; /* the head kbuf is the list itself */ - while (!list_empty(&buf->list)) { + while (!list_empty(&bl->buf_list)) { struct io_buffer *nxt; - nxt = list_first_entry(&buf->list, struct io_buffer, list); + nxt = list_first_entry(&bl->buf_list, struct io_buffer, list); list_del(&nxt->list); - kfree(nxt); if (++i == nbufs) return i; cond_resched(); } i++; - kfree(buf); - xa_erase(&ctx->io_buffers, bgid); return i; } @@ -4489,7 +4722,7 @@ static int io_remove_buffers(struct io_kiocb *req, unsigned int issue_flags) { struct io_provide_buf *p = &req->pbuf; struct io_ring_ctx *ctx = req->ctx; - struct io_buffer *head; + struct io_buffer_list *bl; int ret = 0; bool needs_lock = issue_flags & IO_URING_F_UNLOCKED; @@ -4498,9 +4731,9 @@ static int io_remove_buffers(struct io_kiocb *req, unsigned int issue_flags) lockdep_assert_held(&ctx->uring_lock); ret = -ENOENT; - head = xa_load(&ctx->io_buffers, p->bgid); - if (head) - ret = __io_remove_buffers(ctx, head, p->bgid, p->nbufs); + bl = io_buffer_get_list(ctx, p->bgid); + if (bl) + ret = __io_remove_buffers(ctx, bl, p->nbufs); if (ret < 0) req_set_fail(req); @@ -4545,39 +4778,80 @@ static int io_provide_buffers_prep(struct io_kiocb *req, return 0; } -static int io_add_buffers(struct io_provide_buf *pbuf, struct io_buffer **head) +static int io_refill_buffer_cache(struct io_ring_ctx *ctx) +{ + struct io_buffer *buf; + struct page *page; + int bufs_in_page; + + /* + * Completions that don't happen inline (eg not under uring_lock) will + * add to ->io_buffers_comp. If we don't have any free buffers, check + * the completion list and splice those entries first. + */ + if (!list_empty_careful(&ctx->io_buffers_comp)) { + spin_lock(&ctx->completion_lock); + if (!list_empty(&ctx->io_buffers_comp)) { + list_splice_init(&ctx->io_buffers_comp, + &ctx->io_buffers_cache); + spin_unlock(&ctx->completion_lock); + return 0; + } + spin_unlock(&ctx->completion_lock); + } + + /* + * No free buffers and no completion entries either. Allocate a new + * page worth of buffer entries and add those to our freelist. + */ + page = alloc_page(GFP_KERNEL_ACCOUNT); + if (!page) + return -ENOMEM; + + list_add(&page->lru, &ctx->io_buffers_pages); + + buf = page_address(page); + bufs_in_page = PAGE_SIZE / sizeof(*buf); + while (bufs_in_page) { + list_add_tail(&buf->list, &ctx->io_buffers_cache); + buf++; + bufs_in_page--; + } + + return 0; +} + +static int io_add_buffers(struct io_ring_ctx *ctx, struct io_provide_buf *pbuf, + struct io_buffer_list *bl) { struct io_buffer *buf; u64 addr = pbuf->addr; int i, bid = pbuf->bid; for (i = 0; i < pbuf->nbufs; i++) { - buf = kmalloc(sizeof(*buf), GFP_KERNEL_ACCOUNT); - if (!buf) + if (list_empty(&ctx->io_buffers_cache) && + io_refill_buffer_cache(ctx)) break; - + buf = list_first_entry(&ctx->io_buffers_cache, struct io_buffer, + list); + list_move_tail(&buf->list, &bl->buf_list); buf->addr = addr; buf->len = min_t(__u32, pbuf->len, MAX_RW_COUNT); buf->bid = bid; + buf->bgid = pbuf->bgid; addr += pbuf->len; bid++; - if (!*head) { - INIT_LIST_HEAD(&buf->list); - *head = buf; - } else { - list_add_tail(&buf->list, &(*head)->list); - } cond_resched(); } - return i ? i : -ENOMEM; + return i ? 0 : -ENOMEM; } static int io_provide_buffers(struct io_kiocb *req, unsigned int issue_flags) { struct io_provide_buf *p = &req->pbuf; struct io_ring_ctx *ctx = req->ctx; - struct io_buffer *head, *list; + struct io_buffer_list *bl; int ret = 0; bool needs_lock = issue_flags & IO_URING_F_UNLOCKED; @@ -4585,14 +4859,18 @@ static int io_provide_buffers(struct io_kiocb *req, unsigned int issue_flags) lockdep_assert_held(&ctx->uring_lock); - list = head = xa_load(&ctx->io_buffers, p->bgid); - - ret = io_add_buffers(p, &head); - if (ret >= 0 && !list) { - ret = xa_insert(&ctx->io_buffers, p->bgid, head, GFP_KERNEL); - if (ret < 0) - __io_remove_buffers(ctx, head, p->bgid, -1U); + bl = io_buffer_get_list(ctx, p->bgid); + if (unlikely(!bl)) { + bl = kmalloc(sizeof(*bl), GFP_KERNEL); + if (!bl) { + ret = -ENOMEM; + goto err; + } + io_buffer_add_list(ctx, bl, p->bgid); } + + ret = io_add_buffers(ctx, p, bl); +err: if (ret < 0) req_set_fail(req); /* complete before unlock, IOPOLL may need the lock */ @@ -5184,7 +5462,7 @@ static int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags) if (kmsg->free_iov) kfree(kmsg->free_iov); req->flags &= ~REQ_F_NEED_CLEANUP; - __io_req_complete(req, issue_flags, ret, io_put_kbuf(req)); + __io_req_complete(req, issue_flags, ret, io_put_kbuf(req, issue_flags)); return 0; } @@ -5239,7 +5517,8 @@ static int io_recv(struct io_kiocb *req, unsigned int issue_flags) out_free: req_set_fail(req); } - __io_req_complete(req, issue_flags, ret, io_put_kbuf(req)); + + __io_req_complete(req, issue_flags, ret, io_put_kbuf(req, issue_flags)); return 0; } @@ -5258,8 +5537,7 @@ static int io_accept_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) accept->nofile = rlimit(RLIMIT_NOFILE); accept->file_slot = READ_ONCE(sqe->file_index); - if (accept->file_slot && ((req->open.how.flags & O_CLOEXEC) || - (accept->flags & SOCK_CLOEXEC))) + if (accept->file_slot && (accept->flags & SOCK_CLOEXEC)) return -EINVAL; if (accept->flags & ~(SOCK_CLOEXEC | SOCK_NONBLOCK)) return -EINVAL; @@ -5399,6 +5677,108 @@ IO_NETOP_FN(send); IO_NETOP_FN(recv); #endif /* CONFIG_NET */ +#ifdef CONFIG_NET_RX_BUSY_POLL + +#define NAPI_TIMEOUT (60 * SEC_CONVERSION) + +struct napi_entry { + struct list_head list; + unsigned int napi_id; + unsigned long timeout; +}; + +/* + * Add busy poll NAPI ID from sk. + */ +static void io_add_napi(struct file *file, struct io_ring_ctx *ctx) +{ + unsigned int napi_id; + struct socket *sock; + struct sock *sk; + struct napi_entry *ne; + + if (!net_busy_loop_on()) + return; + + sock = sock_from_file(file); + if (!sock) + return; + + sk = sock->sk; + if (!sk) + return; + + napi_id = READ_ONCE(sk->sk_napi_id); + + /* Non-NAPI IDs can be rejected */ + if (napi_id < MIN_NAPI_ID) + return; + + spin_lock(&ctx->napi_lock); + list_for_each_entry(ne, &ctx->napi_list, list) { + if (ne->napi_id == napi_id) { + ne->timeout = jiffies + NAPI_TIMEOUT; + goto out; + } + } + + ne = kmalloc(sizeof(*ne), GFP_NOWAIT); + if (!ne) + goto out; + + ne->napi_id = napi_id; + ne->timeout = jiffies + NAPI_TIMEOUT; + list_add_tail(&ne->list, &ctx->napi_list); +out: + spin_unlock(&ctx->napi_lock); +} + +static inline void io_check_napi_entry_timeout(struct napi_entry *ne) +{ + if (time_after(jiffies, ne->timeout)) { + list_del(&ne->list); + kfree(ne); + } +} + +/* + * Busy poll if globally on and supporting sockets found + */ +static bool io_napi_busy_loop(struct list_head *napi_list) +{ + struct napi_entry *ne, *n; + + list_for_each_entry_safe(ne, n, napi_list, list) { + napi_busy_loop(ne->napi_id, NULL, NULL, true, + BUSY_POLL_BUDGET); + io_check_napi_entry_timeout(ne); + } + return !list_empty(napi_list); +} + +static void io_free_napi_list(struct io_ring_ctx *ctx) +{ + spin_lock(&ctx->napi_lock); + while (!list_empty(&ctx->napi_list)) { + struct napi_entry *ne = + list_first_entry(&ctx->napi_list, struct napi_entry, + list); + + list_del(&ne->list); + kfree(ne); + } + spin_unlock(&ctx->napi_lock); +} +#else +static inline void io_add_napi(struct file *file, struct io_ring_ctx *ctx) +{ +} + +static inline void io_free_napi_list(struct io_ring_ctx *ctx) +{ +} +#endif /* CONFIG_NET_RX_BUSY_POLL */ + struct io_poll_table { struct poll_table_struct pt; struct io_kiocb *req; @@ -5474,8 +5854,12 @@ static inline void io_poll_remove_entry(struct io_poll_iocb *poll) static void io_poll_remove_entries(struct io_kiocb *req) { - struct io_poll_iocb *poll = io_poll_get_single(req); - struct io_poll_iocb *poll_double = io_poll_get_double(req); + /* + * Nothing to do if neither of those flags are set. Avoid dipping + * into the poll/apoll/double cachelines if we can. + */ + if (!(req->flags & (REQ_F_SINGLE_POLL | REQ_F_DOUBLE_POLL))) + return; /* * While we hold the waitqueue lock and the waitqueue is nonempty, @@ -5493,9 +5877,10 @@ static void io_poll_remove_entries(struct io_kiocb *req) * In that case, only RCU prevents the queue memory from being freed. */ rcu_read_lock(); - io_poll_remove_entry(poll); - if (poll_double) - io_poll_remove_entry(poll_double); + if (req->flags & REQ_F_SINGLE_POLL) + io_poll_remove_entry(io_poll_get_single(req)); + if (req->flags & REQ_F_DOUBLE_POLL) + io_poll_remove_entry(io_poll_get_double(req)); rcu_read_unlock(); } @@ -5527,13 +5912,13 @@ static int io_poll_check_events(struct io_kiocb *req) return -ECANCELED; if (!req->result) { - struct poll_table_struct pt = { ._key = poll->events }; + struct poll_table_struct pt = { ._key = req->cflags }; - req->result = vfs_poll(req->file, &pt) & poll->events; + req->result = vfs_poll(req->file, &pt) & req->cflags; } /* multishot, just fill an CQE and proceed */ - if (req->result && !(poll->events & EPOLLONESHOT)) { + if (req->result && !(req->cflags & EPOLLONESHOT)) { __poll_t mask = mangle_poll(req->result & poll->events); bool filled; @@ -5545,6 +5930,7 @@ static int io_poll_check_events(struct io_kiocb *req) if (unlikely(!filled)) return -ECANCELED; io_cqring_ev_posted(ctx); + io_add_napi(req->file, ctx); } else if (req->result) { return 0; } @@ -5603,29 +5989,36 @@ static void io_apoll_task_func(struct io_kiocb *req, bool *locked) io_req_complete_failed(req, ret); } -static void __io_poll_execute(struct io_kiocb *req, int mask) +static void __io_poll_execute(struct io_kiocb *req, int mask, int events) { req->result = mask; + /* + * This is useful for poll that is armed on behalf of another + * request, and where the wakeup path could be on a different + * CPU. We want to avoid pulling in req->apoll->events for that + * case. + */ + req->cflags = events; if (req->opcode == IORING_OP_POLL_ADD) req->io_task_work.func = io_poll_task_func; else req->io_task_work.func = io_apoll_task_func; - trace_io_uring_task_add(req->ctx, req->opcode, req->user_data, mask); + trace_io_uring_task_add(req->ctx, req, req->user_data, req->opcode, mask); io_req_task_work_add(req, false); } -static inline void io_poll_execute(struct io_kiocb *req, int res) +static inline void io_poll_execute(struct io_kiocb *req, int res, int events) { if (io_poll_get_ownership(req)) - __io_poll_execute(req, res); + __io_poll_execute(req, res, events); } static void io_poll_cancel_req(struct io_kiocb *req) { io_poll_mark_cancelled(req); /* kick tw, which should complete the request */ - io_poll_execute(req, 0); + io_poll_execute(req, 0, 0); } static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, @@ -5639,7 +6032,7 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, if (unlikely(mask & POLLFREE)) { io_poll_mark_cancelled(req); /* we have to kick tw in case it's not already */ - io_poll_execute(req, 0); + io_poll_execute(req, 0, poll->events); /* * If the waitqueue is being freed early but someone is already @@ -5669,8 +6062,9 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, if (mask && poll->events & EPOLLONESHOT) { list_del_init(&poll->wait.entry); poll->head = NULL; + req->flags &= ~REQ_F_SINGLE_POLL; } - __io_poll_execute(req, mask); + __io_poll_execute(req, mask, poll->events); } return 1; } @@ -5705,12 +6099,14 @@ static void __io_queue_proc(struct io_poll_iocb *poll, struct io_poll_table *pt, pt->error = -ENOMEM; return; } + req->flags |= REQ_F_DOUBLE_POLL; io_init_poll_iocb(poll, first->events, first->wait.func); *poll_ptr = poll; if (req->opcode == IORING_OP_POLL_ADD) req->flags |= REQ_F_ASYNC_DATA; } + req->flags |= REQ_F_SINGLE_POLL; pt->nr_entries++; poll->head = head; poll->wait.private = req; @@ -5774,9 +6170,10 @@ static int __io_arm_poll_handler(struct io_kiocb *req, /* can't multishot if failed, just queue the event we've got */ if (unlikely(ipt->error || !ipt->nr_entries)) poll->events |= EPOLLONESHOT; - __io_poll_execute(req, mask); + __io_poll_execute(req, mask, poll->events); return 0; } + io_add_napi(req->file, req->ctx); /* * Release ownership. If someone tried to queue a tw while it was @@ -5784,7 +6181,7 @@ static int __io_arm_poll_handler(struct io_kiocb *req, */ v = atomic_dec_return(&req->poll_refs); if (unlikely(v & IO_POLL_REF_MASK)) - __io_poll_execute(req, 0); + __io_poll_execute(req, 0, poll->events); return 0; } @@ -5803,7 +6200,7 @@ enum { IO_APOLL_READY }; -static int io_arm_poll_handler(struct io_kiocb *req) +static int io_arm_poll_handler(struct io_kiocb *req, unsigned issue_flags) { const struct io_op_def *def = &io_op_defs[req->opcode]; struct io_ring_ctx *ctx = req->ctx; @@ -5828,9 +6225,16 @@ static int io_arm_poll_handler(struct io_kiocb *req) mask |= POLLOUT | POLLWRNORM; } - apoll = kmalloc(sizeof(*apoll), GFP_ATOMIC); - if (unlikely(!apoll)) - return IO_APOLL_ABORTED; + if (!(issue_flags & IO_URING_F_UNLOCKED) && + !list_empty(&ctx->apoll_cache)) { + apoll = list_first_entry(&ctx->apoll_cache, struct async_poll, + poll.wait.entry); + list_del_init(&apoll->poll.wait.entry); + } else { + apoll = kmalloc(sizeof(*apoll), GFP_ATOMIC); + if (unlikely(!apoll)) + return IO_APOLL_ABORTED; + } apoll->double_poll = NULL; req->apoll = apoll; req->flags |= REQ_F_POLLED; @@ -5840,7 +6244,7 @@ static int io_arm_poll_handler(struct io_kiocb *req) if (ret || ipt.error) return ret ? IO_APOLL_READY : IO_APOLL_ABORTED; - trace_io_uring_poll_arm(ctx, req, req->opcode, req->user_data, + trace_io_uring_poll_arm(ctx, req, req->user_data, req->opcode, mask, apoll->poll.events); return IO_APOLL_OK; } @@ -5975,7 +6379,7 @@ static int io_poll_add_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe return -EINVAL; io_req_set_refcount(req); - poll->events = io_poll_parse_events(sqe, flags); + req->cflags = poll->events = io_poll_parse_events(sqe, flags); return 0; } @@ -6092,10 +6496,7 @@ static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data) if (IS_ERR(req)) return PTR_ERR(req); - - req_set_fail(req); - io_fill_cqe_req(req, -ECANCELED, 0); - io_put_req_deferred(req); + io_req_task_queue_fail(req, -ECANCELED); return 0; } @@ -6568,6 +6969,8 @@ static int io_req_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) return io_symlinkat_prep(req, sqe); case IORING_OP_LINKAT: return io_linkat_prep(req, sqe); + case IORING_OP_MSG_RING: + return io_msg_ring_prep(req, sqe); } printk_once(KERN_WARNING "io_uring: unhandled opcode %d\n", @@ -6649,7 +7052,7 @@ fail: goto queue; } - trace_io_uring_defer(ctx, req, req->user_data); + trace_io_uring_defer(ctx, req, req->user_data, req->opcode); de->req = req; de->seq = seq; list_add_tail(&de->list, &ctx->defer_list); @@ -6659,7 +7062,7 @@ fail: static void io_clean_op(struct io_kiocb *req) { if (req->flags & REQ_F_BUFFER_SELECTED) - io_put_kbuf(req); + io_put_kbuf_comp(req); if (req->flags & REQ_F_NEED_CLEANUP) { switch (req->opcode) { @@ -6851,6 +7254,9 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags) case IORING_OP_LINKAT: ret = io_linkat(req, issue_flags); break; + case IORING_OP_MSG_RING: + ret = io_msg_ring(req, issue_flags); + break; default: ret = -EINVAL; break; @@ -6926,7 +7332,7 @@ static void io_wq_submit_work(struct io_wq_work *work) continue; } - if (io_arm_poll_handler(req) == IO_APOLL_OK) + if (io_arm_poll_handler(req, issue_flags) == IO_APOLL_OK) return; /* aborted or ready, in either case retry blocking */ needs_poll = false; @@ -6983,7 +7389,7 @@ static struct file *io_file_get_normal(struct io_ring_ctx *ctx, { struct file *file = fget(fd); - trace_io_uring_file_get(ctx, fd); + trace_io_uring_file_get(ctx, req, req->user_data, fd); /* we don't allow fixed io_uring files */ if (file && unlikely(file->f_op == &io_uring_fops)) @@ -7072,7 +7478,7 @@ static void io_queue_sqe_arm_apoll(struct io_kiocb *req) { struct io_kiocb *linked_timeout = io_prep_linked_timeout(req); - switch (io_arm_poll_handler(req)) { + switch (io_arm_poll_handler(req, 0)) { case IO_APOLL_READY: io_req_task_queue(req); break; @@ -7081,8 +7487,12 @@ static void io_queue_sqe_arm_apoll(struct io_kiocb *req) * Queued up for async execution, worker will release * submit reference when the iocb is actually submitted. */ + io_kbuf_recycle(req); io_queue_async_work(req, NULL); break; + case IO_APOLL_OK: + io_kbuf_recycle(req); + break; } if (linked_timeout) @@ -7281,7 +7691,7 @@ static int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, ret = io_init_req(ctx, req, sqe); if (unlikely(ret)) { - trace_io_uring_req_failed(sqe, ret); + trace_io_uring_req_failed(sqe, ctx, req, ret); /* fail even hard links since we don't submit */ if (link->head) { @@ -7308,7 +7718,7 @@ static int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, } /* don't need @sqe from now on */ - trace_io_uring_submit_sqe(ctx, req, req->opcode, req->user_data, + trace_io_uring_submit_sqe(ctx, req, req->user_data, req->opcode, req->flags, true, ctx->flags & IORING_SETUP_SQPOLL); @@ -7451,8 +7861,14 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr) } /* will complete beyond this point, count as submitted */ submitted++; - if (io_submit_sqe(ctx, req, sqe)) - break; + if (io_submit_sqe(ctx, req, sqe)) { + /* + * Continue submitting even for sqe failure if the + * ring was setup with IORING_SETUP_SUBMIT_ALL + */ + if (!(ctx->flags & IORING_SETUP_SUBMIT_ALL)) + break; + } } while (submitted < nr); if (unlikely(submitted != nr)) { @@ -7519,7 +7935,13 @@ static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries) !(ctx->flags & IORING_SETUP_R_DISABLED)) ret = io_submit_sqes(ctx, to_submit); mutex_unlock(&ctx->uring_lock); - +#ifdef CONFIG_NET_RX_BUSY_POLL + spin_lock(&ctx->napi_lock); + if (!list_empty(&ctx->napi_list) && + io_napi_busy_loop(&ctx->napi_list)) + ++ret; + spin_unlock(&ctx->napi_lock); +#endif if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait)) wake_up(&ctx->sqo_sq_wait); if (creds) @@ -7650,6 +8072,9 @@ struct io_wait_queue { struct io_ring_ctx *ctx; unsigned cq_tail; unsigned nr_timeouts; +#ifdef CONFIG_NET_RX_BUSY_POLL + unsigned busy_poll_to; +#endif }; static inline bool io_should_wake(struct io_wait_queue *iowq) @@ -7684,11 +8109,11 @@ static int io_run_task_work_sig(void) { if (io_run_task_work()) return 1; - if (!signal_pending(current)) - return 0; if (test_thread_flag(TIF_NOTIFY_SIGNAL)) return -ERESTARTSYS; - return -EINTR; + if (task_sigpending(current)) + return -EINTR; + return 0; } /* when returns >0, the caller should retry */ @@ -7711,6 +8136,87 @@ static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx, return 1; } +#ifdef CONFIG_NET_RX_BUSY_POLL +static void io_adjust_busy_loop_timeout(struct timespec64 *ts, + struct io_wait_queue *iowq) +{ + unsigned busy_poll_to = READ_ONCE(sysctl_net_busy_poll); + struct timespec64 pollto = ns_to_timespec64(1000 * (s64)busy_poll_to); + + if (timespec64_compare(ts, &pollto) > 0) { + *ts = timespec64_sub(*ts, pollto); + iowq->busy_poll_to = busy_poll_to; + } else { + u64 to = timespec64_to_ns(ts); + + do_div(to, 1000); + iowq->busy_poll_to = to; + ts->tv_sec = 0; + ts->tv_nsec = 0; + } +} + +static inline bool io_busy_loop_timeout(unsigned long start_time, + unsigned long bp_usec) +{ + if (bp_usec) { + unsigned long end_time = start_time + bp_usec; + unsigned long now = busy_loop_current_time(); + + return time_after(now, end_time); + } + return true; +} + +static bool io_busy_loop_end(void *p, unsigned long start_time) +{ + struct io_wait_queue *iowq = p; + + return signal_pending(current) || + io_should_wake(iowq) || + io_busy_loop_timeout(start_time, iowq->busy_poll_to); +} + +static void io_blocking_napi_busy_loop(struct list_head *napi_list, + struct io_wait_queue *iowq) +{ + unsigned long start_time = + list_is_singular(napi_list) ? 0 : + busy_loop_current_time(); + + do { + if (list_is_singular(napi_list)) { + struct napi_entry *ne = + list_first_entry(napi_list, + struct napi_entry, list); + + napi_busy_loop(ne->napi_id, io_busy_loop_end, iowq, + true, BUSY_POLL_BUDGET); + io_check_napi_entry_timeout(ne); + break; + } + } while (io_napi_busy_loop(napi_list) && + !io_busy_loop_end(iowq, start_time)); +} + +static void io_putback_napi_list(struct io_ring_ctx *ctx, + struct list_head *napi_list) +{ + struct napi_entry *cne, *lne; + + spin_lock(&ctx->napi_lock); + list_for_each_entry(cne, &ctx->napi_list, list) + list_for_each_entry(lne, napi_list, list) + if (cne->napi_id == lne->napi_id) { + list_del(&lne->list); + kfree(lne); + break; + } + list_splice(napi_list, &ctx->napi_list); + spin_unlock(&ctx->napi_lock); +} +#endif /* CONFIG_NET_RX_BUSY_POLL */ + /* * Wait until events become available, if we don't already have some. The * application must reap them itself, as they reside on the shared cq ring. @@ -7723,6 +8229,9 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, struct io_rings *rings = ctx->rings; ktime_t timeout = KTIME_MAX; int ret; +#ifdef CONFIG_NET_RX_BUSY_POLL + LIST_HEAD(local_napi_list); +#endif do { io_cqring_overflow_flush(ctx); @@ -7732,14 +8241,6 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, break; } while (1); - if (uts) { - struct timespec64 ts; - - if (get_timespec64(&ts, uts)) - return -EFAULT; - timeout = ktime_add_ns(timespec64_to_ktime(ts), ktime_get_ns()); - } - if (sig) { #ifdef CONFIG_COMPAT if (in_compat_syscall()) @@ -7753,6 +8254,30 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, return ret; } +#ifdef CONFIG_NET_RX_BUSY_POLL + iowq.busy_poll_to = 0; + if (!(ctx->flags & IORING_SETUP_SQPOLL)) { + spin_lock(&ctx->napi_lock); + list_splice_init(&ctx->napi_list, &local_napi_list); + spin_unlock(&ctx->napi_lock); + } +#endif + if (uts) { + struct timespec64 ts; + + if (get_timespec64(&ts, uts)) + return -EFAULT; +#ifdef CONFIG_NET_RX_BUSY_POLL + if (!list_empty(&local_napi_list)) + io_adjust_busy_loop_timeout(&ts, &iowq); +#endif + timeout = ktime_add_ns(timespec64_to_ktime(ts), ktime_get_ns()); + } +#ifdef CONFIG_NET_RX_BUSY_POLL + else if (!list_empty(&local_napi_list)) + iowq.busy_poll_to = READ_ONCE(sysctl_net_busy_poll); +#endif + init_waitqueue_func_entry(&iowq.wq, io_wake_function); iowq.wq.private = current; INIT_LIST_HEAD(&iowq.wq.entry); @@ -7761,6 +8286,12 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events; trace_io_uring_cqring_wait(ctx, min_events); +#ifdef CONFIG_NET_RX_BUSY_POLL + if (iowq.busy_poll_to) + io_blocking_napi_busy_loop(&local_napi_list, &iowq); + if (!list_empty(&local_napi_list)) + io_putback_napi_list(ctx, &local_napi_list); +#endif do { /* if we can't even flush overflow, don't wait for more */ if (!io_cqring_overflow_flush(ctx)) { @@ -8749,8 +9280,16 @@ static __cold int io_uring_alloc_task_context(struct task_struct *task, if (unlikely(!tctx)) return -ENOMEM; + tctx->registered_rings = kcalloc(IO_RINGFD_REG_MAX, + sizeof(struct file *), GFP_KERNEL); + if (unlikely(!tctx->registered_rings)) { + kfree(tctx); + return -ENOMEM; + } + ret = percpu_counter_init(&tctx->inflight, 0, GFP_KERNEL); if (unlikely(ret)) { + kfree(tctx->registered_rings); kfree(tctx); return ret; } @@ -8759,6 +9298,7 @@ static __cold int io_uring_alloc_task_context(struct task_struct *task, if (IS_ERR(tctx->io_wq)) { ret = PTR_ERR(tctx->io_wq); percpu_counter_destroy(&tctx->inflight); + kfree(tctx->registered_rings); kfree(tctx); return ret; } @@ -8783,6 +9323,7 @@ void __io_uring_free(struct task_struct *tsk) WARN_ON_ONCE(tctx->io_wq); WARN_ON_ONCE(tctx->cached_refs); + kfree(tctx->registered_rings); percpu_counter_destroy(&tctx->inflight); kfree(tctx); tsk->io_uring = NULL; @@ -9359,33 +9900,55 @@ static int __io_sqe_buffers_update(struct io_ring_ctx *ctx, return done ? done : err; } -static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg) +static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg, + unsigned int eventfd_async) { + struct io_ev_fd *ev_fd; __s32 __user *fds = arg; int fd; - if (ctx->cq_ev_fd) + ev_fd = rcu_dereference_protected(ctx->io_ev_fd, + lockdep_is_held(&ctx->uring_lock)); + if (ev_fd) return -EBUSY; if (copy_from_user(&fd, fds, sizeof(*fds))) return -EFAULT; - ctx->cq_ev_fd = eventfd_ctx_fdget(fd); - if (IS_ERR(ctx->cq_ev_fd)) { - int ret = PTR_ERR(ctx->cq_ev_fd); + ev_fd = kmalloc(sizeof(*ev_fd), GFP_KERNEL); + if (!ev_fd) + return -ENOMEM; - ctx->cq_ev_fd = NULL; + ev_fd->cq_ev_fd = eventfd_ctx_fdget(fd); + if (IS_ERR(ev_fd->cq_ev_fd)) { + int ret = PTR_ERR(ev_fd->cq_ev_fd); + kfree(ev_fd); return ret; } - + ev_fd->eventfd_async = eventfd_async; + ctx->has_evfd = true; + rcu_assign_pointer(ctx->io_ev_fd, ev_fd); return 0; } +static void io_eventfd_put(struct rcu_head *rcu) +{ + struct io_ev_fd *ev_fd = container_of(rcu, struct io_ev_fd, rcu); + + eventfd_ctx_put(ev_fd->cq_ev_fd); + kfree(ev_fd); +} + static int io_eventfd_unregister(struct io_ring_ctx *ctx) { - if (ctx->cq_ev_fd) { - eventfd_ctx_put(ctx->cq_ev_fd); - ctx->cq_ev_fd = NULL; + struct io_ev_fd *ev_fd; + + ev_fd = rcu_dereference_protected(ctx->io_ev_fd, + lockdep_is_held(&ctx->uring_lock)); + if (ev_fd) { + ctx->has_evfd = false; + rcu_assign_pointer(ctx->io_ev_fd, NULL); + call_rcu(&ev_fd->rcu, io_eventfd_put); return 0; } @@ -9394,11 +9957,28 @@ static int io_eventfd_unregister(struct io_ring_ctx *ctx) static void io_destroy_buffers(struct io_ring_ctx *ctx) { - struct io_buffer *buf; - unsigned long index; + int i; + + for (i = 0; i < (1U << IO_BUFFERS_HASH_BITS); i++) { + struct list_head *list = &ctx->io_buffers[i]; - xa_for_each(&ctx->io_buffers, index, buf) - __io_remove_buffers(ctx, buf, index, -1U); + while (!list_empty(list)) { + struct io_buffer_list *bl; + + bl = list_first_entry(list, struct io_buffer_list, list); + __io_remove_buffers(ctx, bl, -1U); + list_del(&bl->list); + kfree(bl); + } + } + + while (!list_empty(&ctx->io_buffers_pages)) { + struct page *page; + + page = list_first_entry(&ctx->io_buffers_pages, struct page, lru); + list_del_init(&page->lru); + __free_page(page); + } } static void io_req_caches_free(struct io_ring_ctx *ctx) @@ -9429,6 +10009,18 @@ static void io_wait_rsrc_data(struct io_rsrc_data *data) wait_for_completion(&data->done); } +static void io_flush_apoll_cache(struct io_ring_ctx *ctx) +{ + struct async_poll *apoll; + + while (!list_empty(&ctx->apoll_cache)) { + apoll = list_first_entry(&ctx->apoll_cache, struct async_poll, + poll.wait.entry); + list_del(&apoll->poll.wait.entry); + kfree(apoll); + } +} + static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx) { io_sq_thread_finish(ctx); @@ -9450,8 +10042,9 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx) __io_sqe_files_unregister(ctx); if (ctx->rings) __io_cqring_overflow_flush(ctx, true); - mutex_unlock(&ctx->uring_lock); io_eventfd_unregister(ctx); + io_flush_apoll_cache(ctx); + mutex_unlock(&ctx->uring_lock); io_destroy_buffers(ctx); if (ctx->sq_creds) put_cred(ctx->sq_creds); @@ -9483,8 +10076,10 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx) io_req_caches_free(ctx); if (ctx->hash_map) io_wq_put_hash(ctx->hash_map); + io_free_napi_list(ctx); kfree(ctx->cancel_hash); kfree(ctx->dummy_ubuf); + kfree(ctx->io_buffers); kfree(ctx); } @@ -9983,6 +10578,139 @@ void __io_uring_cancel(bool cancel_all) io_uring_cancel_generic(cancel_all, NULL); } +void io_uring_unreg_ringfd(void) +{ + struct io_uring_task *tctx = current->io_uring; + int i; + + for (i = 0; i < IO_RINGFD_REG_MAX; i++) { + if (tctx->registered_rings[i]) { + fput(tctx->registered_rings[i]); + tctx->registered_rings[i] = NULL; + } + } +} + +static int io_ring_add_registered_fd(struct io_uring_task *tctx, int fd, + int start, int end) +{ + struct file *file; + int offset; + + for (offset = start; offset < end; offset++) { + offset = array_index_nospec(offset, IO_RINGFD_REG_MAX); + if (tctx->registered_rings[offset]) + continue; + + file = fget(fd); + if (!file) { + return -EBADF; + } else if (file->f_op != &io_uring_fops) { + fput(file); + return -EOPNOTSUPP; + } + tctx->registered_rings[offset] = file; + return offset; + } + + return -EBUSY; +} + +/* + * Register a ring fd to avoid fdget/fdput for each io_uring_enter() + * invocation. User passes in an array of struct io_uring_rsrc_update + * with ->data set to the ring_fd, and ->offset given for the desired + * index. If no index is desired, application may set ->offset == -1U + * and we'll find an available index. Returns number of entries + * successfully processed, or < 0 on error if none were processed. + */ +static int io_ringfd_register(struct io_ring_ctx *ctx, void __user *__arg, + unsigned nr_args) +{ + struct io_uring_rsrc_update __user *arg = __arg; + struct io_uring_rsrc_update reg; + struct io_uring_task *tctx; + int ret, i; + + if (!nr_args || nr_args > IO_RINGFD_REG_MAX) + return -EINVAL; + + mutex_unlock(&ctx->uring_lock); + ret = io_uring_add_tctx_node(ctx); + mutex_lock(&ctx->uring_lock); + if (ret) + return ret; + + tctx = current->io_uring; + for (i = 0; i < nr_args; i++) { + int start, end; + + if (copy_from_user(®, &arg[i], sizeof(reg))) { + ret = -EFAULT; + break; + } + + if (reg.offset == -1U) { + start = 0; + end = IO_RINGFD_REG_MAX; + } else { + if (reg.offset >= IO_RINGFD_REG_MAX) { + ret = -EINVAL; + break; + } + start = reg.offset; + end = start + 1; + } + + ret = io_ring_add_registered_fd(tctx, reg.data, start, end); + if (ret < 0) + break; + + reg.offset = ret; + if (copy_to_user(&arg[i], ®, sizeof(reg))) { + fput(tctx->registered_rings[reg.offset]); + tctx->registered_rings[reg.offset] = NULL; + ret = -EFAULT; + break; + } + } + + return i ? i : ret; +} + +static int io_ringfd_unregister(struct io_ring_ctx *ctx, void __user *__arg, + unsigned nr_args) +{ + struct io_uring_rsrc_update __user *arg = __arg; + struct io_uring_task *tctx = current->io_uring; + struct io_uring_rsrc_update reg; + int ret = 0, i; + + if (!nr_args || nr_args > IO_RINGFD_REG_MAX) + return -EINVAL; + if (!tctx) + return 0; + + for (i = 0; i < nr_args; i++) { + if (copy_from_user(®, &arg[i], sizeof(reg))) { + ret = -EFAULT; + break; + } + if (reg.offset >= IO_RINGFD_REG_MAX) { + ret = -EINVAL; + break; + } + + reg.offset = array_index_nospec(reg.offset, IO_RINGFD_REG_MAX); + if (tctx->registered_rings[reg.offset]) { + fput(tctx->registered_rings[reg.offset]); + tctx->registered_rings[reg.offset] = NULL; + } + } + + return i ? i : ret; +} + static void *io_uring_validate_mmap_request(struct file *file, loff_t pgoff, size_t sz) { @@ -10113,12 +10841,28 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, io_run_task_work(); if (unlikely(flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP | - IORING_ENTER_SQ_WAIT | IORING_ENTER_EXT_ARG))) + IORING_ENTER_SQ_WAIT | IORING_ENTER_EXT_ARG | + IORING_ENTER_REGISTERED_RING))) return -EINVAL; - f = fdget(fd); - if (unlikely(!f.file)) - return -EBADF; + /* + * Ring fd has been registered via IORING_REGISTER_RING_FDS, we + * need only dereference our task private array to find it. + */ + if (flags & IORING_ENTER_REGISTERED_RING) { + struct io_uring_task *tctx = current->io_uring; + + if (!tctx || fd >= IO_RINGFD_REG_MAX) + return -EINVAL; + fd = array_index_nospec(fd, IO_RINGFD_REG_MAX); + f.file = tctx->registered_rings[fd]; + if (unlikely(!f.file)) + return -EBADF; + } else { + f = fdget(fd); + if (unlikely(!f.file)) + return -EBADF; + } ret = -EOPNOTSUPP; if (unlikely(f.file->f_op != &io_uring_fops)) @@ -10192,7 +10936,8 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, out: percpu_ref_put(&ctx->refs); out_fput: - fdput(f); + if (!(flags & IORING_ENTER_REGISTERED_RING)) + fdput(f); return submitted ? submitted : ret; } @@ -10610,7 +11355,7 @@ static long io_uring_setup(u32 entries, struct io_uring_params __user *params) if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL | IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE | IORING_SETUP_CLAMP | IORING_SETUP_ATTACH_WQ | - IORING_SETUP_R_DISABLED)) + IORING_SETUP_R_DISABLED | IORING_SETUP_SUBMIT_ALL)) return -EINVAL; return io_uring_create(entries, &p, params); @@ -10960,61 +11705,6 @@ err: return ret; } -static bool io_register_op_must_quiesce(int op) -{ - switch (op) { - case IORING_REGISTER_BUFFERS: - case IORING_UNREGISTER_BUFFERS: - case IORING_REGISTER_FILES: - case IORING_UNREGISTER_FILES: - case IORING_REGISTER_FILES_UPDATE: - case IORING_REGISTER_PROBE: - case IORING_REGISTER_PERSONALITY: - case IORING_UNREGISTER_PERSONALITY: - case IORING_REGISTER_FILES2: - case IORING_REGISTER_FILES_UPDATE2: - case IORING_REGISTER_BUFFERS2: - case IORING_REGISTER_BUFFERS_UPDATE: - case IORING_REGISTER_IOWQ_AFF: - case IORING_UNREGISTER_IOWQ_AFF: - case IORING_REGISTER_IOWQ_MAX_WORKERS: - return false; - default: - return true; - } -} - -static __cold int io_ctx_quiesce(struct io_ring_ctx *ctx) -{ - long ret; - - percpu_ref_kill(&ctx->refs); - - /* - * Drop uring mutex before waiting for references to exit. If another - * thread is currently inside io_uring_enter() it might need to grab the - * uring_lock to make progress. If we hold it here across the drain - * wait, then we can deadlock. It's safe to drop the mutex here, since - * no new references will come in after we've killed the percpu ref. - */ - mutex_unlock(&ctx->uring_lock); - do { - ret = wait_for_completion_interruptible_timeout(&ctx->ref_comp, HZ); - if (ret) { - ret = min(0L, ret); - break; - } - - ret = io_run_task_work_sig(); - io_req_caches_free(ctx); - } while (ret >= 0); - mutex_lock(&ctx->uring_lock); - - if (ret) - io_refs_resurrect(&ctx->refs, &ctx->ref_comp); - return ret; -} - static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, void __user *arg, unsigned nr_args) __releases(ctx->uring_lock) @@ -11038,12 +11728,6 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, return -EACCES; } - if (io_register_op_must_quiesce(opcode)) { - ret = io_ctx_quiesce(ctx); - if (ret) - return ret; - } - switch (opcode) { case IORING_REGISTER_BUFFERS: ret = io_sqe_buffers_register(ctx, arg, nr_args, NULL); @@ -11067,17 +11751,16 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, ret = io_register_files_update(ctx, arg, nr_args); break; case IORING_REGISTER_EVENTFD: - case IORING_REGISTER_EVENTFD_ASYNC: ret = -EINVAL; if (nr_args != 1) break; - ret = io_eventfd_register(ctx, arg); - if (ret) + ret = io_eventfd_register(ctx, arg, 0); + break; + case IORING_REGISTER_EVENTFD_ASYNC: + ret = -EINVAL; + if (nr_args != 1) break; - if (opcode == IORING_REGISTER_EVENTFD_ASYNC) - ctx->eventfd_async = 1; - else - ctx->eventfd_async = 0; + ret = io_eventfd_register(ctx, arg, 1); break; case IORING_UNREGISTER_EVENTFD: ret = -EINVAL; @@ -11144,16 +11827,17 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, break; ret = io_register_iowq_max_workers(ctx, arg); break; + case IORING_REGISTER_RING_FDS: + ret = io_ringfd_register(ctx, arg, nr_args); + break; + case IORING_UNREGISTER_RING_FDS: + ret = io_ringfd_unregister(ctx, arg, nr_args); + break; default: ret = -EINVAL; break; } - if (io_register_op_must_quiesce(opcode)) { - /* bring the ctx back to life */ - percpu_ref_reinit(&ctx->refs); - reinit_completion(&ctx->ref_comp); - } return ret; } @@ -11179,8 +11863,7 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode, mutex_lock(&ctx->uring_lock); ret = __io_uring_register(ctx, opcode, arg, nr_args); mutex_unlock(&ctx->uring_lock); - trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs, - ctx->cq_ev_fd != NULL, ret); + trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs, ret); out_fput: fdput(f); return ret; |