diff options
author | Linus Torvalds | 2021-02-27 08:29:02 -0800 |
---|---|---|
committer | Linus Torvalds | 2021-02-27 08:29:02 -0800 |
commit | 5695e51619745d4fe3ec2506a2f0cd982c5e27a4 (patch) | |
tree | 479e4e6e05f5fc175ed8168b47102c2eb0d2238e | |
parent | 5ceabb6078b80a8544ba86d6ee523ad755ae6d5e (diff) | |
parent | d6ce7f6761bf6d669d9c74ec5d3bd1bfe92380c5 (diff) |
Merge tag 'io_uring-worker.v3-2021-02-25' of git://git.kernel.dk/linux-block
Pull io_uring thread rewrite from Jens Axboe:
"This converts the io-wq workers to be forked off the tasks in question
instead of being kernel threads that assume various bits of the
original task identity.
This kills > 400 lines of code from io_uring/io-wq, and it's the worst
part of the code. We've had several bugs in this area, and the worry
is always that we could be missing some pieces for file types doing
unusual things (recent /dev/tty example comes to mind, userfaultfd
reads installing file descriptors is another fun one... - both of
which need special handling, and I bet it's not the last weird oddity
we'll find).
With these identical workers, we can have full confidence that we're
never missing anything. That, in itself, is a huge win. Outside of
that, it's also more efficient since we're not wasting space and code
on tracking state, or switching between different states.
I'm sure we're going to find little things to patch up after this
series, but testing has been pretty thorough, from the usual
regression suite to production. Any issue that may crop up should be
manageable.
There's also a nice series of further reductions we can do on top of
this, but I wanted to get the meat of it out sooner rather than later.
The general worry here isn't that it's fundamentally broken. Most of
the little issues we've found over the last week have been related to
just changes in how thread startup/exit is done, since that's the main
difference between using kthreads and these kinds of threads. In fact,
if all goes according to plan, I want to get this into the 5.10 and
5.11 stable branches as well.
That said, the changes outside of io_uring/io-wq are:
- arch setup, simple one-liner to each arch copy_thread()
implementation.
- Removal of net and proc restrictions for io_uring, they are no
longer needed or useful"
* tag 'io_uring-worker.v3-2021-02-25' of git://git.kernel.dk/linux-block: (30 commits)
io-wq: remove now unused IO_WQ_BIT_ERROR
io_uring: fix SQPOLL thread handling over exec
io-wq: improve manager/worker handling over exec
io_uring: ensure SQPOLL startup is triggered before error shutdown
io-wq: make buffered file write hashed work map per-ctx
io-wq: fix race around io_worker grabbing
io-wq: fix races around manager/worker creation and task exit
io_uring: ensure io-wq context is always destroyed for tasks
arch: ensure parisc/powerpc handle PF_IO_WORKER in copy_thread()
io_uring: cleanup ->user usage
io-wq: remove nr_process accounting
io_uring: flag new native workers with IORING_FEAT_NATIVE_WORKERS
net: remove cmsg restriction from io_uring based send/recvmsg calls
Revert "proc: don't allow async path resolution of /proc/self components"
Revert "proc: don't allow async path resolution of /proc/thread-self components"
io_uring: move SQPOLL thread io-wq forked worker
io-wq: make io_wq_fork_thread() available to other users
io-wq: only remove worker from free_list, if it was there
io_uring: remove io_identity
io_uring: remove any grabbing of context
...
38 files changed, 694 insertions, 1114 deletions
diff --git a/arch/alpha/kernel/process.c b/arch/alpha/kernel/process.c index 6c71554206cc..5112ab996394 100644 --- a/arch/alpha/kernel/process.c +++ b/arch/alpha/kernel/process.c @@ -249,7 +249,7 @@ int copy_thread(unsigned long clone_flags, unsigned long usp, childti->pcb.ksp = (unsigned long) childstack; childti->pcb.flags = 1; /* set FEN, clear everything else */ - if (unlikely(p->flags & PF_KTHREAD)) { + if (unlikely(p->flags & (PF_KTHREAD | PF_IO_WORKER))) { /* kernel thread */ memset(childstack, 0, sizeof(struct switch_stack) + sizeof(struct pt_regs)); diff --git a/arch/arc/kernel/process.c b/arch/arc/kernel/process.c index 37f724ad5e39..d838d0d57696 100644 --- a/arch/arc/kernel/process.c +++ b/arch/arc/kernel/process.c @@ -191,7 +191,7 @@ int copy_thread(unsigned long clone_flags, unsigned long usp, childksp[0] = 0; /* fp */ childksp[1] = (unsigned long)ret_from_fork; /* blink */ - if (unlikely(p->flags & PF_KTHREAD)) { + if (unlikely(p->flags & (PF_KTHREAD | PF_IO_WORKER))) { memset(c_regs, 0, sizeof(struct pt_regs)); c_callee->r13 = kthread_arg; diff --git a/arch/arm/kernel/process.c b/arch/arm/kernel/process.c index ee3aee69e444..5199a2bb4111 100644 --- a/arch/arm/kernel/process.c +++ b/arch/arm/kernel/process.c @@ -243,7 +243,7 @@ int copy_thread(unsigned long clone_flags, unsigned long stack_start, thread->cpu_domain = get_domain(); #endif - if (likely(!(p->flags & PF_KTHREAD))) { + if (likely(!(p->flags & (PF_KTHREAD | PF_IO_WORKER)))) { *childregs = *current_pt_regs(); childregs->ARM_r0 = 0; if (stack_start) diff --git a/arch/arm64/kernel/process.c b/arch/arm64/kernel/process.c index 4cc1ccc8d6ab..325c83b1a24d 100644 --- a/arch/arm64/kernel/process.c +++ b/arch/arm64/kernel/process.c @@ -398,7 +398,7 @@ int copy_thread(unsigned long clone_flags, unsigned long stack_start, ptrauth_thread_init_kernel(p); - if (likely(!(p->flags & PF_KTHREAD))) { + if (likely(!(p->flags & (PF_KTHREAD | PF_IO_WORKER)))) { *childregs = *current_pt_regs(); childregs->regs[0] = 0; diff --git a/arch/csky/kernel/process.c b/arch/csky/kernel/process.c index 69af6bc87e64..3d0ca22cd0e2 100644 --- a/arch/csky/kernel/process.c +++ b/arch/csky/kernel/process.c @@ -49,7 +49,7 @@ int copy_thread(unsigned long clone_flags, /* setup thread.sp for switch_to !!! */ p->thread.sp = (unsigned long)childstack; - if (unlikely(p->flags & PF_KTHREAD)) { + if (unlikely(p->flags & (PF_KTHREAD | PF_IO_WORKER))) { memset(childregs, 0, sizeof(struct pt_regs)); childstack->r15 = (unsigned long) ret_from_kernel_thread; childstack->r10 = kthread_arg; diff --git a/arch/h8300/kernel/process.c b/arch/h8300/kernel/process.c index bc1364db58fe..46b1342ce515 100644 --- a/arch/h8300/kernel/process.c +++ b/arch/h8300/kernel/process.c @@ -112,7 +112,7 @@ int copy_thread(unsigned long clone_flags, unsigned long usp, childregs = (struct pt_regs *) (THREAD_SIZE + task_stack_page(p)) - 1; - if (unlikely(p->flags & PF_KTHREAD)) { + if (unlikely(p->flags & (PF_KTHREAD | PF_IO_WORKER))) { memset(childregs, 0, sizeof(struct pt_regs)); childregs->retpc = (unsigned long) ret_from_kernel_thread; childregs->er4 = topstk; /* arg */ diff --git a/arch/hexagon/kernel/process.c b/arch/hexagon/kernel/process.c index 6a980cba7b29..c61165c99ae0 100644 --- a/arch/hexagon/kernel/process.c +++ b/arch/hexagon/kernel/process.c @@ -73,7 +73,7 @@ int copy_thread(unsigned long clone_flags, unsigned long usp, unsigned long arg, sizeof(*ss)); ss->lr = (unsigned long)ret_from_fork; p->thread.switch_sp = ss; - if (unlikely(p->flags & PF_KTHREAD)) { + if (unlikely(p->flags & (PF_KTHREAD | PF_IO_WORKER))) { memset(childregs, 0, sizeof(struct pt_regs)); /* r24 <- fn, r25 <- arg */ ss->r24 = usp; diff --git a/arch/ia64/kernel/process.c b/arch/ia64/kernel/process.c index 4ebbfa076a26..7e1a1525e202 100644 --- a/arch/ia64/kernel/process.c +++ b/arch/ia64/kernel/process.c @@ -338,7 +338,7 @@ copy_thread(unsigned long clone_flags, unsigned long user_stack_base, ia64_drop_fpu(p); /* don't pick up stale state from a CPU's fph */ - if (unlikely(p->flags & PF_KTHREAD)) { + if (unlikely(p->flags & (PF_KTHREAD | PF_IO_WORKER))) { if (unlikely(!user_stack_base)) { /* fork_idle() called us */ return 0; diff --git a/arch/m68k/kernel/process.c b/arch/m68k/kernel/process.c index 08359a6e058f..da83cc83e791 100644 --- a/arch/m68k/kernel/process.c +++ b/arch/m68k/kernel/process.c @@ -157,7 +157,7 @@ int copy_thread(unsigned long clone_flags, unsigned long usp, unsigned long arg, */ p->thread.fs = get_fs().seg; - if (unlikely(p->flags & PF_KTHREAD)) { + if (unlikely(p->flags & (PF_KTHREAD | PF_IO_WORKER))) { /* kernel thread */ memset(frame, 0, sizeof(struct fork_frame)); frame->regs.sr = PS_S; diff --git a/arch/microblaze/kernel/process.c b/arch/microblaze/kernel/process.c index 657c2beb665e..62aa237180b6 100644 --- a/arch/microblaze/kernel/process.c +++ b/arch/microblaze/kernel/process.c @@ -59,7 +59,7 @@ int copy_thread(unsigned long clone_flags, unsigned long usp, unsigned long arg, struct pt_regs *childregs = task_pt_regs(p); struct thread_info *ti = task_thread_info(p); - if (unlikely(p->flags & PF_KTHREAD)) { + if (unlikely(p->flags & (PF_KTHREAD | PF_IO_WORKER))) { /* if we're creating a new kernel thread then just zeroing all * the registers. That's OK for a brand new thread.*/ memset(childregs, 0, sizeof(struct pt_regs)); diff --git a/arch/mips/kernel/process.c b/arch/mips/kernel/process.c index af4c862ec5ff..7efa0d1a4c2b 100644 --- a/arch/mips/kernel/process.c +++ b/arch/mips/kernel/process.c @@ -120,7 +120,7 @@ int copy_thread(unsigned long clone_flags, unsigned long usp, /* Put the stack after the struct pt_regs. */ childksp = (unsigned long) childregs; p->thread.cp0_status = (read_c0_status() & ~(ST0_CU2|ST0_CU1)) | ST0_KERNEL_CUMASK; - if (unlikely(p->flags & PF_KTHREAD)) { + if (unlikely(p->flags & (PF_KTHREAD | PF_IO_WORKER))) { /* kernel thread */ unsigned long status = p->thread.cp0_status; memset(childregs, 0, sizeof(struct pt_regs)); diff --git a/arch/nds32/kernel/process.c b/arch/nds32/kernel/process.c index e01ad5d17224..c1327e552ec6 100644 --- a/arch/nds32/kernel/process.c +++ b/arch/nds32/kernel/process.c @@ -156,7 +156,7 @@ int copy_thread(unsigned long clone_flags, unsigned long stack_start, memset(&p->thread.cpu_context, 0, sizeof(struct cpu_context)); - if (unlikely(p->flags & PF_KTHREAD)) { + if (unlikely(p->flags & (PF_KTHREAD | PF_IO_WORKER))) { memset(childregs, 0, sizeof(struct pt_regs)); /* kernel thread fn */ p->thread.cpu_context.r6 = stack_start; diff --git a/arch/nios2/kernel/process.c b/arch/nios2/kernel/process.c index 50b4eb19a6cc..c5f916ca6845 100644 --- a/arch/nios2/kernel/process.c +++ b/arch/nios2/kernel/process.c @@ -109,7 +109,7 @@ int copy_thread(unsigned long clone_flags, unsigned long usp, unsigned long arg, struct switch_stack *childstack = ((struct switch_stack *)childregs) - 1; - if (unlikely(p->flags & PF_KTHREAD)) { + if (unlikely(p->flags & (PF_KTHREAD | PF_IO_WORKER))) { memset(childstack, 0, sizeof(struct switch_stack) + sizeof(struct pt_regs)); diff --git a/arch/openrisc/kernel/process.c b/arch/openrisc/kernel/process.c index 181448f74316..eb62429681fc 100644 --- a/arch/openrisc/kernel/process.c +++ b/arch/openrisc/kernel/process.c @@ -174,7 +174,7 @@ copy_thread(unsigned long clone_flags, unsigned long usp, unsigned long arg, sp -= sizeof(struct pt_regs); kregs = (struct pt_regs *)sp; - if (unlikely(p->flags & PF_KTHREAD)) { + if (unlikely(p->flags & (PF_KTHREAD | PF_IO_WORKER))) { memset(kregs, 0, sizeof(struct pt_regs)); kregs->gpr[20] = usp; /* fn, kernel thread */ kregs->gpr[22] = arg; diff --git a/arch/parisc/kernel/process.c b/arch/parisc/kernel/process.c index fda1c1a6a444..b144fbe29bc1 100644 --- a/arch/parisc/kernel/process.c +++ b/arch/parisc/kernel/process.c @@ -200,7 +200,7 @@ copy_thread(unsigned long clone_flags, unsigned long usp, extern void * const ret_from_kernel_thread; extern void * const child_return; - if (unlikely(p->flags & PF_KTHREAD)) { + if (unlikely(p->flags & (PF_KTHREAD | PF_IO_WORKER))) { /* kernel thread */ memset(cregs, 0, sizeof(struct pt_regs)); if (!usp) /* idle thread */ diff --git a/arch/powerpc/kernel/process.c b/arch/powerpc/kernel/process.c index 924d023dad0a..3231c2df9e26 100644 --- a/arch/powerpc/kernel/process.c +++ b/arch/powerpc/kernel/process.c @@ -1670,7 +1670,7 @@ int copy_thread(unsigned long clone_flags, unsigned long usp, /* Copy registers */ sp -= sizeof(struct pt_regs); childregs = (struct pt_regs *) sp; - if (unlikely(p->flags & PF_KTHREAD)) { + if (unlikely(p->flags & (PF_KTHREAD | PF_IO_WORKER))) { /* kernel thread */ memset(childregs, 0, sizeof(struct pt_regs)); childregs->gpr[1] = sp + sizeof(struct pt_regs); diff --git a/arch/riscv/kernel/process.c b/arch/riscv/kernel/process.c index 19f4688f2f36..6f728e731bed 100644 --- a/arch/riscv/kernel/process.c +++ b/arch/riscv/kernel/process.c @@ -124,7 +124,7 @@ int copy_thread(unsigned long clone_flags, unsigned long usp, unsigned long arg, struct pt_regs *childregs = task_pt_regs(p); /* p->thread holds context to be restored by __switch_to() */ - if (unlikely(p->flags & PF_KTHREAD)) { + if (unlikely(p->flags & (PF_KTHREAD | PF_IO_WORKER))) { /* Kernel thread */ memset(childregs, 0, sizeof(struct pt_regs)); childregs->gp = gp_in_global; diff --git a/arch/s390/kernel/process.c b/arch/s390/kernel/process.c index 367bd000f6d1..e20bed1ed34a 100644 --- a/arch/s390/kernel/process.c +++ b/arch/s390/kernel/process.c @@ -130,7 +130,7 @@ int copy_thread(unsigned long clone_flags, unsigned long new_stackp, frame->sf.gprs[9] = (unsigned long)frame; /* Store access registers to kernel stack of new process. */ - if (unlikely(p->flags & PF_KTHREAD)) { + if (unlikely(p->flags & (PF_KTHREAD | PF_IO_WORKER))) { /* kernel thread */ memset(&frame->childregs, 0, sizeof(struct pt_regs)); frame->childregs.psw.mask = PSW_KERNEL_BITS | PSW_MASK_DAT | diff --git a/arch/sh/kernel/process_32.c b/arch/sh/kernel/process_32.c index 80a5d1c66a51..1aa508eb0823 100644 --- a/arch/sh/kernel/process_32.c +++ b/arch/sh/kernel/process_32.c @@ -114,7 +114,7 @@ int copy_thread(unsigned long clone_flags, unsigned long usp, unsigned long arg, childregs = task_pt_regs(p); p->thread.sp = (unsigned long) childregs; - if (unlikely(p->flags & PF_KTHREAD)) { + if (unlikely(p->flags & (PF_KTHREAD | PF_IO_WORKER))) { memset(childregs, 0, sizeof(struct pt_regs)); p->thread.pc = (unsigned long) ret_from_kernel_thread; childregs->regs[4] = arg; diff --git a/arch/sparc/kernel/process_32.c b/arch/sparc/kernel/process_32.c index 7649b14d69f8..b91e88058e0c 100644 --- a/arch/sparc/kernel/process_32.c +++ b/arch/sparc/kernel/process_32.c @@ -309,7 +309,7 @@ int copy_thread(unsigned long clone_flags, unsigned long sp, unsigned long arg, ti->ksp = (unsigned long) new_stack; p->thread.kregs = childregs; - if (unlikely(p->flags & PF_KTHREAD)) { + if (unlikely(p->flags & (PF_KTHREAD | PF_IO_WORKER))) { extern int nwindows; unsigned long psr; memset(new_stack, 0, STACKFRAME_SZ + TRACEREG_SZ); diff --git a/arch/sparc/kernel/process_64.c b/arch/sparc/kernel/process_64.c index 6f8c7822fc06..7afd0a859a78 100644 --- a/arch/sparc/kernel/process_64.c +++ b/arch/sparc/kernel/process_64.c @@ -597,7 +597,7 @@ int copy_thread(unsigned long clone_flags, unsigned long sp, unsigned long arg, sizeof(struct sparc_stackf)); t->fpsaved[0] = 0; - if (unlikely(p->flags & PF_KTHREAD)) { + if (unlikely(p->flags & (PF_KTHREAD | PF_IO_WORKER))) { memset(child_trap_frame, 0, child_stack_sz); __thread_flag_byte_ptr(t)[TI_FLAG_BYTE_CWP] = (current_pt_regs()->tstate + 1) & TSTATE_CWP; diff --git a/arch/um/kernel/process.c b/arch/um/kernel/process.c index 81d508daf67c..c5011064b5dd 100644 --- a/arch/um/kernel/process.c +++ b/arch/um/kernel/process.c @@ -157,7 +157,7 @@ int copy_thread(unsigned long clone_flags, unsigned long sp, unsigned long arg, struct task_struct * p, unsigned long tls) { void (*handler)(void); - int kthread = current->flags & PF_KTHREAD; + int kthread = current->flags & (PF_KTHREAD | PF_IO_WORKER); int ret = 0; p->thread = (struct thread_struct) INIT_THREAD; diff --git a/arch/x86/kernel/process.c b/arch/x86/kernel/process.c index 145a7ac0c19a..9c214d7085a4 100644 --- a/arch/x86/kernel/process.c +++ b/arch/x86/kernel/process.c @@ -161,7 +161,7 @@ int copy_thread(unsigned long clone_flags, unsigned long sp, unsigned long arg, #endif /* Kernel thread ? */ - if (unlikely(p->flags & PF_KTHREAD)) { + if (unlikely(p->flags & (PF_KTHREAD | PF_IO_WORKER))) { memset(childregs, 0, sizeof(struct pt_regs)); kthread_frame_init(frame, sp, arg); return 0; diff --git a/arch/xtensa/kernel/process.c b/arch/xtensa/kernel/process.c index 397a7de56377..9534ef515d74 100644 --- a/arch/xtensa/kernel/process.c +++ b/arch/xtensa/kernel/process.c @@ -217,7 +217,7 @@ int copy_thread(unsigned long clone_flags, unsigned long usp_thread_fn, p->thread.sp = (unsigned long)childregs; - if (!(p->flags & PF_KTHREAD)) { + if (!(p->flags & (PF_KTHREAD | PF_IO_WORKER))) { struct pt_regs *regs = current_pt_regs(); unsigned long usp = usp_thread_fn ? usp_thread_fn : regs->areg[1]; diff --git a/fs/io-wq.c b/fs/io-wq.c index c36bbcd823ce..44e20248805a 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -13,13 +13,9 @@ #include <linux/sched/mm.h> #include <linux/percpu.h> #include <linux/slab.h> -#include <linux/kthread.h> #include <linux/rculist_nulls.h> -#include <linux/fs_struct.h> -#include <linux/task_work.h> -#include <linux/blk-cgroup.h> -#include <linux/audit.h> #include <linux/cpu.h> +#include <linux/tracehook.h> #include "../kernel/sched/sched.h" #include "io-wq.h" @@ -36,7 +32,6 @@ enum { enum { IO_WQ_BIT_EXIT = 0, /* wq exiting */ - IO_WQ_BIT_ERROR = 1, /* error on setup */ }; enum { @@ -57,14 +52,12 @@ struct io_worker { struct io_wq_work *cur_work; spinlock_t lock; - struct rcu_head rcu; - struct mm_struct *mm; -#ifdef CONFIG_BLK_CGROUP - struct cgroup_subsys_state *blkcg_css; -#endif const struct cred *cur_creds; const struct cred *saved_creds; - struct nsproxy *restore_nsproxy; + + struct completion ref_done; + + struct rcu_head rcu; }; #if BITS_PER_LONG == 64 @@ -93,7 +86,6 @@ struct io_wqe { struct { raw_spinlock_t lock; struct io_wq_work_list work_list; - unsigned long hash_map; unsigned flags; } ____cacheline_aligned_in_smp; @@ -103,6 +95,8 @@ struct io_wqe { struct hlist_nulls_head free_list; struct list_head all_list; + struct wait_queue_entry wait; + struct io_wq *wq; struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; }; @@ -119,12 +113,15 @@ struct io_wq { struct task_struct *manager; struct user_struct *user; + + struct io_wq_hash *hash; + refcount_t refs; struct completion done; struct hlist_node cpuhp_node; - refcount_t use_refs; + pid_t task_pid; }; static enum cpuhp_state io_wq_online; @@ -137,62 +134,7 @@ static bool io_worker_get(struct io_worker *worker) static void io_worker_release(struct io_worker *worker) { if (refcount_dec_and_test(&worker->ref)) - wake_up_process(worker->task); -} - -/* - * Note: drops the wqe->lock if returning true! The caller must re-acquire - * the lock in that case. Some callers need to restart handling if this - * happens, so we can't just re-acquire the lock on behalf of the caller. - */ -static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker) -{ - bool dropped_lock = false; - - if (worker->saved_creds) { - revert_creds(worker->saved_creds); - worker->cur_creds = worker->saved_creds = NULL; - } - - if (current->files) { - __acquire(&wqe->lock); - raw_spin_unlock_irq(&wqe->lock); - dropped_lock = true; - - task_lock(current); - current->files = NULL; - current->nsproxy = worker->restore_nsproxy; - task_unlock(current); - } - - if (current->fs) - current->fs = NULL; - - /* - * If we have an active mm, we need to drop the wq lock before unusing - * it. If we do, return true and let the caller retry the idle loop. - */ - if (worker->mm) { - if (!dropped_lock) { - __acquire(&wqe->lock); - raw_spin_unlock_irq(&wqe->lock); - dropped_lock = true; - } - __set_current_state(TASK_RUNNING); - kthread_unuse_mm(worker->mm); - mmput(worker->mm); - worker->mm = NULL; - } - -#ifdef CONFIG_BLK_CGROUP - if (worker->blkcg_css) { - kthread_associate_blkcg(NULL); - worker->blkcg_css = NULL; - } -#endif - if (current->signal->rlim[RLIMIT_FSIZE].rlim_cur != RLIM_INFINITY) - current->signal->rlim[RLIMIT_FSIZE].rlim_cur = RLIM_INFINITY; - return dropped_lock; + complete(&worker->ref_done); } static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe, @@ -204,9 +146,10 @@ static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe, return &wqe->acct[IO_WQ_ACCT_BOUND]; } -static inline struct io_wqe_acct *io_wqe_get_acct(struct io_wqe *wqe, - struct io_worker *worker) +static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker) { + struct io_wqe *wqe = worker->wqe; + if (worker->flags & IO_WORKER_F_BOUND) return &wqe->acct[IO_WQ_ACCT_BOUND]; @@ -216,39 +159,36 @@ static inline struct io_wqe_acct *io_wqe_get_acct(struct io_wqe *wqe, static void io_worker_exit(struct io_worker *worker) { struct io_wqe *wqe = worker->wqe; - struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); + struct io_wqe_acct *acct = io_wqe_get_acct(worker); + unsigned flags; - /* - * If we're not at zero, someone else is holding a brief reference - * to the worker. Wait for that to go away. - */ - set_current_state(TASK_INTERRUPTIBLE); - if (!refcount_dec_and_test(&worker->ref)) - schedule(); - __set_current_state(TASK_RUNNING); + if (refcount_dec_and_test(&worker->ref)) + complete(&worker->ref_done); + wait_for_completion(&worker->ref_done); preempt_disable(); current->flags &= ~PF_IO_WORKER; - if (worker->flags & IO_WORKER_F_RUNNING) + flags = worker->flags; + worker->flags = 0; + if (flags & IO_WORKER_F_RUNNING) atomic_dec(&acct->nr_running); - if (!(worker->flags & IO_WORKER_F_BOUND)) - atomic_dec(&wqe->wq->user->processes); worker->flags = 0; preempt_enable(); + if (worker->saved_creds) { + revert_creds(worker->saved_creds); + worker->cur_creds = worker->saved_creds = NULL; + } + raw_spin_lock_irq(&wqe->lock); - hlist_nulls_del_rcu(&worker->nulls_node); + if (flags & IO_WORKER_F_FREE) + hlist_nulls_del_rcu(&worker->nulls_node); list_del_rcu(&worker->all_list); - if (__io_worker_unuse(wqe, worker)) { - __release(&wqe->lock); - raw_spin_lock_irq(&wqe->lock); - } acct->nr_workers--; raw_spin_unlock_irq(&wqe->lock); kfree_rcu(worker, rcu); - if (refcount_dec_and_test(&wqe->wq->refs)) - complete(&wqe->wq->done); + io_wq_put(wqe->wq); } static inline bool io_wqe_run_queue(struct io_wqe *wqe) @@ -306,33 +246,27 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) wake_up_process(wqe->wq->manager); } -static void io_wqe_inc_running(struct io_wqe *wqe, struct io_worker *worker) +static void io_wqe_inc_running(struct io_worker *worker) { - struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); + struct io_wqe_acct *acct = io_wqe_get_acct(worker); atomic_inc(&acct->nr_running); } -static void io_wqe_dec_running(struct io_wqe *wqe, struct io_worker *worker) +static void io_wqe_dec_running(struct io_worker *worker) __must_hold(wqe->lock) { - struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); + struct io_wqe_acct *acct = io_wqe_get_acct(worker); + struct io_wqe *wqe = worker->wqe; if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) io_wqe_wake_worker(wqe, acct); } -static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker) +static void io_worker_start(struct io_worker *worker) { - allow_kernel_signal(SIGINT); - - current->flags |= PF_IO_WORKER; - current->fs = NULL; - current->files = NULL; - worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); - worker->restore_nsproxy = current->nsproxy; - io_wqe_inc_running(wqe, worker); + io_wqe_inc_running(worker); } /* @@ -357,19 +291,17 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0; work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0; if (worker_bound != work_bound) { - io_wqe_dec_running(wqe, worker); + io_wqe_dec_running(worker); if (work_bound) { worker->flags |= IO_WORKER_F_BOUND; wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--; wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++; - atomic_dec(&wqe->wq->user->processes); } else { worker->flags &= ~IO_WORKER_F_BOUND; wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++; wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--; - atomic_inc(&wqe->wq->user->processes); } - io_wqe_inc_running(wqe, worker); + io_wqe_inc_running(worker); } } @@ -380,15 +312,17 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, * retry the loop in that case (we changed task state), we don't regrab * the lock if we return success. */ -static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker) +static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker) __must_hold(wqe->lock) { if (!(worker->flags & IO_WORKER_F_FREE)) { worker->flags |= IO_WORKER_F_FREE; hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); } - - return __io_worker_unuse(wqe, worker); + if (worker->saved_creds) { + revert_creds(worker->saved_creds); + worker->cur_creds = worker->saved_creds = NULL; + } } static inline unsigned int io_get_work_hash(struct io_wq_work *work) @@ -396,14 +330,31 @@ static inline unsigned int io_get_work_hash(struct io_wq_work *work) return work->flags >> IO_WQ_HASH_SHIFT; } +static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash) +{ + struct io_wq *wq = wqe->wq; + + spin_lock(&wq->hash->wait.lock); + if (list_empty(&wqe->wait.entry)) { + __add_wait_queue(&wq->hash->wait, &wqe->wait); + if (!test_bit(hash, &wq->hash->map)) { + __set_current_state(TASK_RUNNING); + list_del_init(&wqe->wait.entry); + } + } + spin_unlock(&wq->hash->wait.lock); +} + static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) __must_hold(wqe->lock) { struct io_wq_work_node *node, *prev; struct io_wq_work *work, *tail; - unsigned int hash; + unsigned int stall_hash = -1U; wq_list_for_each(node, prev, &wqe->work_list) { + unsigned int hash; + work = container_of(node, struct io_wq_work, list); /* not hashed, can run anytime */ @@ -412,111 +363,60 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) return work; } - /* hashed, can run if not already running */ hash = io_get_work_hash(work); - if (!(wqe->hash_map & BIT(hash))) { - wqe->hash_map |= BIT(hash); - /* all items with this hash lie in [work, tail] */ - tail = wqe->hash_tail[hash]; + /* all items with this hash lie in [work, tail] */ + tail = wqe->hash_tail[hash]; + + /* hashed, can run if not already running */ + if (!test_and_set_bit(hash, &wqe->wq->hash->map)) { wqe->hash_tail[hash] = NULL; wq_list_cut(&wqe->work_list, &tail->list, prev); return work; } + if (stall_hash == -1U) + stall_hash = hash; + /* fast forward to a next hash, for-each will fix up @prev */ + node = &tail->list; } - return NULL; -} - -static void io_wq_switch_mm(struct io_worker *worker, struct io_wq_work *work) -{ - if (worker->mm) { - kthread_unuse_mm(worker->mm); - mmput(worker->mm); - worker->mm = NULL; + if (stall_hash != -1U) { + raw_spin_unlock(&wqe->lock); + io_wait_on_hash(wqe, stall_hash); + raw_spin_lock(&wqe->lock); } - if (mmget_not_zero(work->identity->mm)) { - kthread_use_mm(work->identity->mm); - worker->mm = work->identity->mm; - return; - } - - /* failed grabbing mm, ensure work gets cancelled */ - work->flags |= IO_WQ_WORK_CANCEL; + return NULL; } -static inline void io_wq_switch_blkcg(struct io_worker *worker, - struct io_wq_work *work) +static void io_flush_signals(void) { -#ifdef CONFIG_BLK_CGROUP - if (!(work->flags & IO_WQ_WORK_BLKCG)) - return; - if (work->identity->blkcg_css != worker->blkcg_css) { - kthread_associate_blkcg(work->identity->blkcg_css); - worker->blkcg_css = work->identity->blkcg_css; + if (unlikely(test_tsk_thread_flag(current, TIF_NOTIFY_SIGNAL))) { + if (current->task_works) + task_work_run(); + clear_tsk_thread_flag(current, TIF_NOTIFY_SIGNAL); } -#endif } static void io_wq_switch_creds(struct io_worker *worker, struct io_wq_work *work) { - const struct cred *old_creds = override_creds(work->identity->creds); + const struct cred *old_creds = override_creds(work->creds); - worker->cur_creds = work->identity->creds; + worker->cur_creds = work->creds; if (worker->saved_creds) put_cred(old_creds); /* creds set by previous switch */ else worker->saved_creds = old_creds; } -static void io_impersonate_work(struct io_worker *worker, - struct io_wq_work *work) -{ - if ((work->flags & IO_WQ_WORK_FILES) && - current->files != work->identity->files) { - task_lock(current); - current->files = work->identity->files; - current->nsproxy = work->identity->nsproxy; - task_unlock(current); - if (!work->identity->files) { - /* failed grabbing files, ensure work gets cancelled */ - work->flags |= IO_WQ_WORK_CANCEL; - } - } - if ((work->flags & IO_WQ_WORK_FS) && current->fs != work->identity->fs) - current->fs = work->identity->fs; - if ((work->flags & IO_WQ_WORK_MM) && work->identity->mm != worker->mm) - io_wq_switch_mm(worker, work); - if ((work->flags & IO_WQ_WORK_CREDS) && - worker->cur_creds != work->identity->creds) - io_wq_switch_creds(worker, work); - if (work->flags & IO_WQ_WORK_FSIZE) - current->signal->rlim[RLIMIT_FSIZE].rlim_cur = work->identity->fsize; - else if (current->signal->rlim[RLIMIT_FSIZE].rlim_cur != RLIM_INFINITY) - current->signal->rlim[RLIMIT_FSIZE].rlim_cur = RLIM_INFINITY; - io_wq_switch_blkcg(worker, work); -#ifdef CONFIG_AUDIT - current->loginuid = work->identity->loginuid; - current->sessionid = work->identity->sessionid; -#endif -} - static void io_assign_current_work(struct io_worker *worker, struct io_wq_work *work) { if (work) { - /* flush pending signals before assigning new work */ - if (signal_pending(current)) - flush_signals(current); + io_flush_signals(); cond_resched(); } -#ifdef CONFIG_AUDIT - current->loginuid = KUIDT_INIT(AUDIT_UID_UNSET); - current->sessionid = AUDIT_SID_UNSET; -#endif - spin_lock_irq(&worker->lock); worker->cur_work = work; spin_unlock_irq(&worker->lock); @@ -550,6 +450,7 @@ get_next: if (!work) break; io_assign_current_work(worker, work); + __set_current_state(TASK_RUNNING); /* handle a whole dependent link */ do { @@ -557,7 +458,8 @@ get_next: unsigned int hash = io_get_work_hash(work); next_hashed = wq_next_work(work); - io_impersonate_work(worker, work); + if (work->creds && worker->cur_creds != work->creds) + io_wq_switch_creds(worker, work); wq->do_work(work); io_assign_current_work(worker, NULL); @@ -572,8 +474,10 @@ get_next: io_wqe_enqueue(wqe, linked); if (hash != -1U && !next_hashed) { + clear_bit(hash, &wq->hash->map); + if (wq_has_sleeper(&wq->hash->wait)) + wake_up(&wq->hash->wait); raw_spin_lock_irq(&wqe->lock); - wqe->hash_map &= ~BIT_ULL(hash); wqe->flags &= ~IO_WQE_FLAG_STALLED; /* skip unnecessary unlock-lock wqe->lock */ if (!work) @@ -592,27 +496,23 @@ static int io_wqe_worker(void *data) struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; - io_worker_start(wqe, worker); + io_worker_start(worker); while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { set_current_state(TASK_INTERRUPTIBLE); loop: raw_spin_lock_irq(&wqe->lock); if (io_wqe_run_queue(wqe)) { - __set_current_state(TASK_RUNNING); io_worker_handle_work(worker); goto loop; } - /* drops the lock on success, retry */ - if (__io_worker_idle(wqe, worker)) { - __release(&wqe->lock); - goto loop; - } + __io_worker_idle(wqe, worker); raw_spin_unlock_irq(&wqe->lock); - if (signal_pending(current)) - flush_signals(current); + io_flush_signals(); if (schedule_timeout(WORKER_IDLE_TIMEOUT)) continue; + if (fatal_signal_pending(current)) + break; /* timed out, exit unless we're the fixed worker */ if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || !(worker->flags & IO_WORKER_F_FIXED)) @@ -636,15 +536,16 @@ loop: */ void io_wq_worker_running(struct task_struct *tsk) { - struct io_worker *worker = kthread_data(tsk); - struct io_wqe *wqe = worker->wqe; + struct io_worker *worker = tsk->pf_io_worker; + if (!worker) + return; if (!(worker->flags & IO_WORKER_F_UP)) return; if (worker->flags & IO_WORKER_F_RUNNING) return; worker->flags |= IO_WORKER_F_RUNNING; - io_wqe_inc_running(wqe, worker); + io_wqe_inc_running(worker); } /* @@ -654,9 +555,10 @@ void io_wq_worker_running(struct task_struct *tsk) */ void io_wq_worker_sleeping(struct task_struct *tsk) { - struct io_worker *worker = kthread_data(tsk); - struct io_wqe *wqe = worker->wqe; + struct io_worker *worker = tsk->pf_io_worker; + if (!worker) + return; if (!(worker->flags & IO_WORKER_F_UP)) return; if (!(worker->flags & IO_WORKER_F_RUNNING)) @@ -664,32 +566,27 @@ void io_wq_worker_sleeping(struct task_struct *tsk) worker->flags &= ~IO_WORKER_F_RUNNING; - raw_spin_lock_irq(&wqe->lock); - io_wqe_dec_running(wqe, worker); - raw_spin_unlock_irq(&wqe->lock); + raw_spin_lock_irq(&worker->wqe->lock); + io_wqe_dec_running(worker); + raw_spin_unlock_irq(&worker->wqe->lock); } -static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) +static int task_thread(void *data, int index) { + struct io_worker *worker = data; + struct io_wqe *wqe = worker->wqe; struct io_wqe_acct *acct = &wqe->acct[index]; - struct io_worker *worker; + struct io_wq *wq = wqe->wq; + char buf[TASK_COMM_LEN]; - worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node); - if (!worker) - return false; + sprintf(buf, "iou-wrk-%d", wq->task_pid); + set_task_comm(current, buf); - refcount_set(&worker->ref, 1); - worker->nulls_node.pprev = NULL; - worker->wqe = wqe; - spin_lock_init(&worker->lock); + current->pf_io_worker = worker; + worker->task = current; - worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node, - "io_wqe_worker-%d/%d", index, wqe->node); - if (IS_ERR(worker->task)) { - kfree(worker); - return false; - } - kthread_bind_mask(worker->task, cpumask_of_node(wqe->node)); + set_cpus_allowed_ptr(current, cpumask_of_node(wqe->node)); + current->flags |= PF_NO_SETAFFINITY; raw_spin_lock_irq(&wqe->lock); hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); @@ -702,11 +599,63 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) acct->nr_workers++; raw_spin_unlock_irq(&wqe->lock); - if (index == IO_WQ_ACCT_UNBOUND) - atomic_inc(&wq->user->processes); + io_wqe_worker(data); + do_exit(0); +} + +static int task_thread_bound(void *data) +{ + return task_thread(data, IO_WQ_ACCT_BOUND); +} + +static int task_thread_unbound(void *data) +{ + return task_thread(data, IO_WQ_ACCT_UNBOUND); +} + +pid_t io_wq_fork_thread(int (*fn)(void *), void *arg) +{ + unsigned long flags = CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD| + CLONE_IO|SIGCHLD; + struct kernel_clone_args args = { + .flags = ((lower_32_bits(flags) | CLONE_VM | + CLONE_UNTRACED) & ~CSIGNAL), + .exit_signal = (lower_32_bits(flags) & CSIGNAL), + .stack = (unsigned long)fn, + .stack_size = (unsigned long)arg, + }; + + return kernel_clone(&args); +} + +static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) +{ + struct io_worker *worker; + pid_t pid; + + __set_current_state(TASK_RUNNING); + + worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node); + if (!worker) + return false; + + refcount_set(&worker->ref, 1); + worker->nulls_node.pprev = NULL; + worker->wqe = wqe; + spin_lock_init(&worker->lock); + init_completion(&worker->ref_done); refcount_inc(&wq->refs); - wake_up_process(worker->task); + + if (index == IO_WQ_ACCT_BOUND) + pid = io_wq_fork_thread(task_thread_bound, worker); + else + pid = io_wq_fork_thread(task_thread_unbound, worker); + if (pid < 0) { + io_wq_put(wq); + kfree(worker); + return false; + } return true; } @@ -752,93 +701,57 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data) return false; } -/* - * Manager thread. Tasked with creating new workers, if we need them. - */ -static int io_wq_manager(void *data) +static void io_wq_check_workers(struct io_wq *wq) { - struct io_wq *wq = data; int node; - /* create fixed workers */ - refcount_set(&wq->refs, 1); for_each_node(node) { + struct io_wqe *wqe = wq->wqes[node]; + bool fork_worker[2] = { false, false }; + if (!node_online(node)) continue; - if (create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND)) - continue; - set_bit(IO_WQ_BIT_ERROR, &wq->state); - set_bit(IO_WQ_BIT_EXIT, &wq->state); - goto out; - } - - complete(&wq->done); - - while (!kthread_should_stop()) { - if (current->task_works) - task_work_run(); - - for_each_node(node) { - struct io_wqe *wqe = wq->wqes[node]; - bool fork_worker[2] = { false, false }; - - if (!node_online(node)) - continue; - - raw_spin_lock_irq(&wqe->lock); - if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND)) - fork_worker[IO_WQ_ACCT_BOUND] = true; - if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND)) - fork_worker[IO_WQ_ACCT_UNBOUND] = true; - raw_spin_unlock_irq(&wqe->lock); - if (fork_worker[IO_WQ_ACCT_BOUND]) - create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND); - if (fork_worker[IO_WQ_ACCT_UNBOUND]) - create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND); - } - set_current_state(TASK_INTERRUPTIBLE); - schedule_timeout(HZ); - } - - if (current->task_works) - task_work_run(); -out: - if (refcount_dec_and_test(&wq->refs)) { - complete(&wq->done); - return 0; - } - /* if ERROR is set and we get here, we have workers to wake */ - if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) { - rcu_read_lock(); - for_each_node(node) - io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); - rcu_read_unlock(); + raw_spin_lock_irq(&wqe->lock); + if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND)) + fork_worker[IO_WQ_ACCT_BOUND] = true; + if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND)) + fork_worker[IO_WQ_ACCT_UNBOUND] = true; + raw_spin_unlock_irq(&wqe->lock); + if (fork_worker[IO_WQ_ACCT_BOUND]) + create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND); + if (fork_worker[IO_WQ_ACCT_UNBOUND]) + create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND); } - return 0; } -static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct, - struct io_wq_work *work) +/* + * Manager thread. Tasked with creating new workers, if we need them. + */ +static int io_wq_manager(void *data) { - bool free_worker; - - if (!(work->flags & IO_WQ_WORK_UNBOUND)) - return true; - if (atomic_read(&acct->nr_running)) - return true; + struct io_wq *wq = data; + char buf[TASK_COMM_LEN]; - rcu_read_lock(); - free_worker = !hlist_nulls_empty(&wqe->free_list); - rcu_read_unlock(); - if (free_worker) - return true; + sprintf(buf, "iou-mgr-%d", wq->task_pid); + set_task_comm(current, buf); + current->flags |= PF_IO_WORKER; + wq->manager = current; - if (atomic_read(&wqe->wq->user->processes) >= acct->max_workers && - !(capable(CAP_SYS_RESOURCE) || capable(CAP_SYS_ADMIN))) - return false; + complete(&wq->done); - return true; + do { + set_current_state(TASK_INTERRUPTIBLE); + io_wq_check_workers(wq); + schedule_timeout(HZ); + if (fatal_signal_pending(current)) + set_bit(IO_WQ_BIT_EXIT, &wq->state); + } while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)); + + io_wq_check_workers(wq); + wq->manager = NULL; + io_wq_put(wq); + do_exit(0); } static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) @@ -872,20 +785,37 @@ append: wq_list_add_after(&work->list, &tail->list, &wqe->work_list); } +static int io_wq_fork_manager(struct io_wq *wq) +{ + int ret; + + if (wq->manager) + return 0; + + clear_bit(IO_WQ_BIT_EXIT, &wq->state); + refcount_inc(&wq->refs); + current->flags |= PF_IO_WORKER; + ret = io_wq_fork_thread(io_wq_manager, wq); + current->flags &= ~PF_IO_WORKER; + if (ret >= 0) { + wait_for_completion(&wq->done); + return 0; + } + + io_wq_put(wq); + return ret; +} + static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) { struct io_wqe_acct *acct = io_work_get_acct(wqe, work); int work_flags; unsigned long flags; - /* - * Do early check to see if we need a new unbound worker, and if we do, - * if we're allowed to do so. This isn't 100% accurate as there's a - * gap between this check and incrementing the value, but that's OK. - * It's close enough to not be an issue, fork() has the same delay. - */ - if (unlikely(!io_wq_can_queue(wqe, acct, work))) { - io_run_cancel(work, wqe); + /* Can only happen if manager creation fails after exec */ + if (unlikely(io_wq_fork_manager(wqe->wq))) { + work->flags |= IO_WQ_WORK_CANCEL; + wqe->wq->do_work(work); return; } @@ -939,7 +869,7 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data) spin_lock_irqsave(&worker->lock, flags); if (worker->cur_work && match->fn(worker->cur_work, match->data)) { - send_sig(SIGINT, worker->task, 1); + set_notify_signal(worker->task); match->nr_running++; } spin_unlock_irqrestore(&worker->lock, flags); @@ -1043,6 +973,24 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, return IO_WQ_CANCEL_NOTFOUND; } +static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode, + int sync, void *key) +{ + struct io_wqe *wqe = container_of(wait, struct io_wqe, wait); + int ret; + + list_del_init(&wait->entry); + + rcu_read_lock(); + ret = io_wqe_activate_free_worker(wqe); + rcu_read_unlock(); + + if (!ret) + wake_up_process(wqe->wq->manager); + + return 1; +} + struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) { int ret = -ENOMEM, node; @@ -1063,12 +1011,11 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) if (ret) goto err_wqes; + refcount_inc(&data->hash->refs); + wq->hash = data->hash; wq->free_work = data->free_work; wq->do_work = data->do_work; - /* caller must already hold a reference to this */ - wq->user = data->user; - ret = -ENOMEM; for_each_node(node) { struct io_wqe *wqe; @@ -1083,11 +1030,11 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) wqe->node = alloc_node; wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0); - if (wq->user) { - wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers = + wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers = task_rlimit(current, RLIMIT_NPROC); - } atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0); + wqe->wait.func = io_wqe_hash_wake; + INIT_LIST_HEAD(&wqe->wait.entry); wqe->wq = wq; raw_spin_lock_init(&wqe->lock); INIT_WQ_LIST(&wqe->work_list); @@ -1095,23 +1042,16 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) INIT_LIST_HEAD(&wqe->all_list); } + wq->task_pid = current->pid; init_completion(&wq->done); + refcount_set(&wq->refs, 1); - wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager"); - if (!IS_ERR(wq->manager)) { - wake_up_process(wq->manager); - wait_for_completion(&wq->done); - if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) { - ret = -ENOMEM; - goto err; - } - refcount_set(&wq->use_refs, 1); - reinit_completion(&wq->done); + ret = io_wq_fork_manager(wq); + if (!ret) return wq; - } - ret = PTR_ERR(wq->manager); - complete(&wq->done); + io_wq_put(wq); + io_wq_put_hash(data->hash); err: cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); for_each_node(node) @@ -1123,15 +1063,7 @@ err_wq: return ERR_PTR(ret); } -bool io_wq_get(struct io_wq *wq, struct io_wq_data *data) -{ - if (data->free_work != wq->free_work || data->do_work != wq->do_work) - return false; - - return refcount_inc_not_zero(&wq->use_refs); -} - -static void __io_wq_destroy(struct io_wq *wq) +static void io_wq_destroy(struct io_wq *wq) { int node; @@ -1139,30 +1071,31 @@ static void __io_wq_destroy(struct io_wq *wq) set_bit(IO_WQ_BIT_EXIT, &wq->state); if (wq->manager) - kthread_stop(wq->manager); + wake_up_process(wq->manager); rcu_read_lock(); for_each_node(node) io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); rcu_read_unlock(); - wait_for_completion(&wq->done); + spin_lock_irq(&wq->hash->wait.lock); + for_each_node(node) { + struct io_wqe *wqe = wq->wqes[node]; - for_each_node(node) - kfree(wq->wqes[node]); + list_del_init(&wqe->wait.entry); + kfree(wqe); + } + spin_unlock_irq(&wq->hash->wait.lock); + io_wq_put_hash(wq->hash); kfree(wq->wqes); kfree(wq); -} -void io_wq_destroy(struct io_wq *wq) -{ - if (refcount_dec_and_test(&wq->use_refs)) - __io_wq_destroy(wq); } -struct task_struct *io_wq_get_task(struct io_wq *wq) +void io_wq_put(struct io_wq *wq) { - return wq->manager; + if (refcount_dec_and_test(&wq->refs)) + io_wq_destroy(wq); } static bool io_wq_worker_affinity(struct io_worker *worker, void *data) diff --git a/fs/io-wq.h b/fs/io-wq.h index 096f1021018e..b6ca12b60c35 100644 --- a/fs/io-wq.h +++ b/fs/io-wq.h @@ -1,6 +1,7 @@ #ifndef INTERNAL_IO_WQ_H #define INTERNAL_IO_WQ_H +#include <linux/refcount.h> #include <linux/io_uring.h> struct io_wq; @@ -11,13 +12,6 @@ enum { IO_WQ_WORK_UNBOUND = 4, IO_WQ_WORK_CONCURRENT = 16, - IO_WQ_WORK_FILES = 32, - IO_WQ_WORK_FS = 64, - IO_WQ_WORK_MM = 128, - IO_WQ_WORK_CREDS = 256, - IO_WQ_WORK_BLKCG = 512, - IO_WQ_WORK_FSIZE = 1024, - IO_WQ_HASH_SHIFT = 24, /* upper 8 bits are used for hash key */ }; @@ -85,7 +79,7 @@ static inline void wq_list_del(struct io_wq_work_list *list, struct io_wq_work { struct io_wq_work_node list; - struct io_identity *identity; + const struct cred *creds; unsigned flags; }; @@ -100,20 +94,32 @@ static inline struct io_wq_work *wq_next_work(struct io_wq_work *work) typedef struct io_wq_work *(free_work_fn)(struct io_wq_work *); typedef void (io_wq_work_fn)(struct io_wq_work *); -struct io_wq_data { - struct user_struct *user; +struct io_wq_hash { + refcount_t refs; + unsigned long map; + struct wait_queue_head wait; +}; +static inline void io_wq_put_hash(struct io_wq_hash *hash) +{ + if (refcount_dec_and_test(&hash->refs)) + kfree(hash); +} + +struct io_wq_data { + struct io_wq_hash *hash; io_wq_work_fn *do_work; free_work_fn *free_work; }; struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data); -bool io_wq_get(struct io_wq *wq, struct io_wq_data *data); -void io_wq_destroy(struct io_wq *wq); +void io_wq_put(struct io_wq *wq); void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work); void io_wq_hash_work(struct io_wq_work *work, void *val); +pid_t io_wq_fork_thread(int (*fn)(void *), void *arg); + static inline bool io_wq_is_hashed(struct io_wq_work *work) { return work->flags & IO_WQ_WORK_HASHED; @@ -124,8 +130,6 @@ typedef bool (work_cancel_fn)(struct io_wq_work *, void *); enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, void *data, bool cancel_all); -struct task_struct *io_wq_get_task(struct io_wq *wq); - #if defined(CONFIG_IO_WQ) extern void io_wq_worker_sleeping(struct task_struct *); extern void io_wq_worker_running(struct task_struct *); @@ -140,6 +144,7 @@ static inline void io_wq_worker_running(struct task_struct *tsk) static inline bool io_wq_current_is_worker(void) { - return in_task() && (current->flags & PF_IO_WORKER); + return in_task() && (current->flags & PF_IO_WORKER) && + current->pf_io_worker; } #endif diff --git a/fs/io_uring.c b/fs/io_uring.c index c9f5f295c2ac..4a088581b0f2 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -57,7 +57,6 @@ #include <linux/mman.h> #include <linux/percpu.h> #include <linux/slab.h> -#include <linux/kthread.h> #include <linux/blkdev.h> #include <linux/bvec.h> #include <linux/net.h> @@ -254,6 +253,11 @@ struct io_restriction { bool registered; }; +enum { + IO_SQ_THREAD_SHOULD_STOP = 0, + IO_SQ_THREAD_SHOULD_PARK, +}; + struct io_sq_data { refcount_t refs; struct mutex lock; @@ -267,6 +271,13 @@ struct io_sq_data { struct wait_queue_head wait; unsigned sq_thread_idle; + int sq_cpu; + pid_t task_pid; + + unsigned long state; + struct completion startup; + struct completion completion; + struct completion exited; }; #define IO_IOPOLL_BATCH 8 @@ -323,12 +334,12 @@ struct io_ring_ctx { struct { unsigned int flags; unsigned int compat: 1; - unsigned int limit_mem: 1; unsigned int cq_overflow_flushed: 1; unsigned int drain_next: 1; unsigned int eventfd_async: 1; unsigned int restricted: 1; unsigned int sqo_dead: 1; + unsigned int sqo_exec: 1; /* * Ring buffer of indices into array of io_uring_sqe, which is @@ -350,6 +361,9 @@ struct io_ring_ctx { unsigned cached_cq_overflow; unsigned long sq_check_overflow; + /* hashed buffered write serialization */ + struct io_wq_hash *hash_map; + struct list_head defer_list; struct list_head timeout_list; struct list_head cq_overflow_list; @@ -366,22 +380,14 @@ struct io_ring_ctx { struct io_rings *rings; - /* IO offload */ - struct io_wq *io_wq; - /* - * For SQPOLL usage - we hold a reference to the parent task, so we - * have access to the ->files + * For SQPOLL usage */ struct task_struct *sqo_task; /* Only used for accounting purposes */ struct mm_struct *mm_account; -#ifdef CONFIG_BLK_CGROUP - struct cgroup_subsys_state *sqo_blkcg_css; -#endif - struct io_sq_data *sq_data; /* if using sq thread polling */ struct wait_queue_head sqo_sq_wait; @@ -401,13 +407,6 @@ struct io_ring_ctx { struct user_struct *user; - const struct cred *creds; - -#ifdef CONFIG_AUDIT - kuid_t loginuid; - unsigned int sessionid; -#endif - struct completion ref_comp; struct completion sq_thread_comp; @@ -456,6 +455,11 @@ struct io_ring_ctx { struct io_restriction restrictions; + /* exit task_work */ + struct callback_head *exit_task_work; + + struct wait_queue_head hash_wait; + /* Keep this last, we don't need it for the fast path */ struct work_struct exit_work; }; @@ -838,7 +842,6 @@ struct io_op_def { unsigned plug : 1; /* size of async data needed, if any */ unsigned short async_size; - unsigned work_flags; }; static const struct io_op_def io_op_defs[] = { @@ -851,7 +854,6 @@ static const struct io_op_def io_op_defs[] = { .needs_async_data = 1, .plug = 1, .async_size = sizeof(struct io_async_rw), - .work_flags = IO_WQ_WORK_MM | IO_WQ_WORK_BLKCG, }, [IORING_OP_WRITEV] = { .needs_file = 1, @@ -861,12 +863,9 @@ static const struct io_op_def io_op_defs[] = { .needs_async_data = 1, .plug = 1, .async_size = sizeof(struct io_async_rw), - .work_flags = IO_WQ_WORK_MM | IO_WQ_WORK_BLKCG | - IO_WQ_WORK_FSIZE, }, [IORING_OP_FSYNC] = { .needs_file = 1, - .work_flags = IO_WQ_WORK_BLKCG, }, [IORING_OP_READ_FIXED] = { .needs_file = 1, @@ -874,7 +873,6 @@ static const struct io_op_def io_op_defs[] = { .pollin = 1, .plug = 1, .async_size = sizeof(struct io_async_rw), - .work_flags = IO_WQ_WORK_BLKCG | IO_WQ_WORK_MM, }, [IORING_OP_WRITE_FIXED] = { .needs_file = 1, @@ -883,8 +881,6 @@ static const struct io_op_def io_op_defs[] = { .pollout = 1, .plug = 1, .async_size = sizeof(struct io_async_rw), - .work_flags = IO_WQ_WORK_BLKCG | IO_WQ_WORK_FSIZE | - IO_WQ_WORK_MM, }, [IORING_OP_POLL_ADD] = { .needs_file = 1, @@ -893,7 +889,6 @@ static const struct io_op_def io_op_defs[] = { [IORING_OP_POLL_REMOVE] = {}, [IORING_OP_SYNC_FILE_RANGE] = { .needs_file = 1, - .work_flags = IO_WQ_WORK_BLKCG, }, [IORING_OP_SENDMSG] = { .needs_file = 1, @@ -901,8 +896,6 @@ static const struct io_op_def io_op_defs[] = { .pollout = 1, .needs_async_data = 1, .async_size = sizeof(struct io_async_msghdr), - .work_flags = IO_WQ_WORK_MM | IO_WQ_WORK_BLKCG | - IO_WQ_WORK_FS, }, [IORING_OP_RECVMSG] = { .needs_file = 1, @@ -911,29 +904,23 @@ static const struct io_op_def io_op_defs[] = { .buffer_select = 1, .needs_async_data = 1, .async_size = sizeof(struct io_async_msghdr), - .work_flags = IO_WQ_WORK_MM | IO_WQ_WORK_BLKCG | - IO_WQ_WORK_FS, }, [IORING_OP_TIMEOUT] = { .needs_async_data = 1, .async_size = sizeof(struct io_timeout_data), - .work_flags = IO_WQ_WORK_MM, }, [IORING_OP_TIMEOUT_REMOVE] = { /* used by timeout updates' prep() */ - .work_flags = IO_WQ_WORK_MM, }, [IORING_OP_ACCEPT] = { .needs_file = 1, .unbound_nonreg_file = 1, .pollin = 1, - .work_flags = IO_WQ_WORK_MM | IO_WQ_WORK_FILES, }, [IORING_OP_ASYNC_CANCEL] = {}, [IORING_OP_LINK_TIMEOUT] = { .needs_async_data = 1, .async_size = sizeof(struct io_timeout_data), - .work_flags = IO_WQ_WORK_MM, }, [IORING_OP_CONNECT] = { .needs_file = 1, @@ -941,26 +928,14 @@ static const struct io_op_def io_op_defs[] = { .pollout = 1, .needs_async_data = 1, .async_size = sizeof(struct io_async_connect), - .work_flags = IO_WQ_WORK_MM, }, [IORING_OP_FALLOCATE] = { .needs_file = 1, - .work_flags = IO_WQ_WORK_BLKCG | IO_WQ_WORK_FSIZE, - }, - [IORING_OP_OPENAT] = { - .work_flags = IO_WQ_WORK_FILES | IO_WQ_WORK_BLKCG | - IO_WQ_WORK_FS | IO_WQ_WORK_MM, - }, - [IORING_OP_CLOSE] = { - .work_flags = IO_WQ_WORK_FILES | IO_WQ_WORK_BLKCG, - }, - [IORING_OP_FILES_UPDATE] = { - .work_flags = IO_WQ_WORK_FILES | IO_WQ_WORK_MM, - }, - [IORING_OP_STATX] = { - .work_flags = IO_WQ_WORK_FILES | IO_WQ_WORK_MM | - IO_WQ_WORK_FS | IO_WQ_WORK_BLKCG, }, + [IORING_OP_OPENAT] = {}, + [IORING_OP_CLOSE] = {}, + [IORING_OP_FILES_UPDATE] = {}, + [IORING_OP_STATX] = {}, [IORING_OP_READ] = { .needs_file = 1, .unbound_nonreg_file = 1, @@ -968,7 +943,6 @@ static const struct io_op_def io_op_defs[] = { .buffer_select = 1, .plug = 1, .async_size = sizeof(struct io_async_rw), - .work_flags = IO_WQ_WORK_MM | IO_WQ_WORK_BLKCG, }, [IORING_OP_WRITE] = { .needs_file = 1, @@ -976,42 +950,31 @@ static const struct io_op_def io_op_defs[] = { .pollout = 1, .plug = 1, .async_size = sizeof(struct io_async_rw), - .work_flags = IO_WQ_WORK_MM | IO_WQ_WORK_BLKCG | - IO_WQ_WORK_FSIZE, }, [IORING_OP_FADVISE] = { .needs_file = 1, - .work_flags = IO_WQ_WORK_BLKCG, - }, - [IORING_OP_MADVISE] = { - .work_flags = IO_WQ_WORK_MM | IO_WQ_WORK_BLKCG, }, + [IORING_OP_MADVISE] = {}, [IORING_OP_SEND] = { .needs_file = 1, .unbound_nonreg_file = 1, .pollout = 1, - .work_flags = IO_WQ_WORK_MM | IO_WQ_WORK_BLKCG, }, [IORING_OP_RECV] = { .needs_file = 1, .unbound_nonreg_file = 1, .pollin = 1, .buffer_select = 1, - .work_flags = IO_WQ_WORK_MM | IO_WQ_WORK_BLKCG, }, [IORING_OP_OPENAT2] = { - .work_flags = IO_WQ_WORK_FILES | IO_WQ_WORK_FS | - IO_WQ_WORK_BLKCG | IO_WQ_WORK_MM, }, [IORING_OP_EPOLL_CTL] = { .unbound_nonreg_file = 1, - .work_flags = IO_WQ_WORK_FILES, }, [IORING_OP_SPLICE] = { .needs_file = 1, .hash_reg_file = 1, .unbound_nonreg_file = 1, - .work_flags = IO_WQ_WORK_BLKCG, }, [IORING_OP_PROVIDE_BUFFERS] = {}, [IORING_OP_REMOVE_BUFFERS] = {}, @@ -1023,19 +986,14 @@ static const struct io_op_def io_op_defs[] = { [IORING_OP_SHUTDOWN] = { .needs_file = 1, }, - [IORING_OP_RENAMEAT] = { - .work_flags = IO_WQ_WORK_MM | IO_WQ_WORK_FILES | - IO_WQ_WORK_FS | IO_WQ_WORK_BLKCG, - }, - [IORING_OP_UNLINKAT] = { - .work_flags = IO_WQ_WORK_MM | IO_WQ_WORK_FILES | - IO_WQ_WORK_FS | IO_WQ_WORK_BLKCG, - }, + [IORING_OP_RENAMEAT] = {}, + [IORING_OP_UNLINKAT] = {}, }; static void io_uring_try_cancel_requests(struct io_ring_ctx *ctx, struct task_struct *task, struct files_struct *files); +static void io_uring_cancel_sqpoll(struct io_ring_ctx *ctx); static void destroy_fixed_rsrc_ref_node(struct fixed_rsrc_ref_node *ref_node); static struct fixed_rsrc_ref_node *alloc_fixed_rsrc_ref_node( struct io_ring_ctx *ctx); @@ -1126,161 +1084,18 @@ static bool io_match_task(struct io_kiocb *head, continue; if (req->file && req->file->f_op == &io_uring_fops) return true; - if ((req->work.flags & IO_WQ_WORK_FILES) && - req->work.identity->files == files) + if (req->task->files == files) return true; } return false; } -static void io_sq_thread_drop_mm_files(void) -{ - struct files_struct *files = current->files; - struct mm_struct *mm = current->mm; - - if (mm) { - kthread_unuse_mm(mm); - mmput(mm); - current->mm = NULL; - } - if (files) { - struct nsproxy *nsproxy = current->nsproxy; - - task_lock(current); - current->files = NULL; - current->nsproxy = NULL; - task_unlock(current); - put_files_struct(files); - put_nsproxy(nsproxy); - } -} - -static int __io_sq_thread_acquire_files(struct io_ring_ctx *ctx) -{ - if (!current->files) { - struct files_struct *files; - struct nsproxy *nsproxy; - - task_lock(ctx->sqo_task); - files = ctx->sqo_task->files; - if (!files) { - task_unlock(ctx->sqo_task); - return -EOWNERDEAD; - } - atomic_inc(&files->count); - get_nsproxy(ctx->sqo_task->nsproxy); - nsproxy = ctx->sqo_task->nsproxy; - task_unlock(ctx->sqo_task); - - task_lock(current); - current->files = files; - current->nsproxy = nsproxy; - task_unlock(current); - } - return 0; -} - -static int __io_sq_thread_acquire_mm(struct io_ring_ctx *ctx) -{ - struct mm_struct *mm; - - if (current->mm) - return 0; - - task_lock(ctx->sqo_task); - mm = ctx->sqo_task->mm; - if (unlikely(!mm || !mmget_not_zero(mm))) - mm = NULL; - task_unlock(ctx->sqo_task); - - if (mm) { - kthread_use_mm(mm); - return 0; - } - - return -EFAULT; -} - -static int __io_sq_thread_acquire_mm_files(struct io_ring_ctx *ctx, - struct io_kiocb *req) -{ - const struct io_op_def *def = &io_op_defs[req->opcode]; - int ret; - - if (def->work_flags & IO_WQ_WORK_MM) { - ret = __io_sq_thread_acquire_mm(ctx); - if (unlikely(ret)) - return ret; - } - - if (def->needs_file || (def->work_flags & IO_WQ_WORK_FILES)) { - ret = __io_sq_thread_acquire_files(ctx); - if (unlikely(ret)) - return ret; - } - - return 0; -} - -static inline int io_sq_thread_acquire_mm_files(struct io_ring_ctx *ctx, - struct io_kiocb *req) -{ - if (!(ctx->flags & IORING_SETUP_SQPOLL)) - return 0; - return __io_sq_thread_acquire_mm_files(ctx, req); -} - -static void io_sq_thread_associate_blkcg(struct io_ring_ctx *ctx, - struct cgroup_subsys_state **cur_css) - -{ -#ifdef CONFIG_BLK_CGROUP - /* puts the old one when swapping */ - if (*cur_css != ctx->sqo_blkcg_css) { - kthread_associate_blkcg(ctx->sqo_blkcg_css); - *cur_css = ctx->sqo_blkcg_css; - } -#endif -} - -static void io_sq_thread_unassociate_blkcg(void) -{ -#ifdef CONFIG_BLK_CGROUP - kthread_associate_blkcg(NULL); -#endif -} - static inline void req_set_fail_links(struct io_kiocb *req) { if ((req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) == REQ_F_LINK) req->flags |= REQ_F_FAIL_LINK; } -/* - * None of these are dereferenced, they are simply used to check if any of - * them have changed. If we're under current and check they are still the - * same, we're fine to grab references to them for actual out-of-line use. - */ -static void io_init_identity(struct io_identity *id) -{ - id->files = current->files; - id->mm = current->mm; -#ifdef CONFIG_BLK_CGROUP - rcu_read_lock(); - id->blkcg_css = blkcg_css(); - rcu_read_unlock(); -#endif - id->creds = current_cred(); - id->nsproxy = current->nsproxy; - id->fs = current->fs; - id->fsize = rlimit(RLIMIT_FSIZE); -#ifdef CONFIG_AUDIT - id->loginuid = current->loginuid; - id->sessionid = current->sessionid; -#endif - refcount_set(&id->count, 1); -} - static inline void __io_req_init_async(struct io_kiocb *req) { memset(&req->work, 0, sizeof(req->work)); @@ -1293,17 +1108,10 @@ static inline void __io_req_init_async(struct io_kiocb *req) */ static inline void io_req_init_async(struct io_kiocb *req) { - struct io_uring_task *tctx = current->io_uring; - if (req->flags & REQ_F_WORK_INITIALIZED) return; __io_req_init_async(req); - - /* Grab a ref if this isn't our static identity */ - req->work.identity = tctx->identity; - if (tctx->identity != &tctx->__identity) - refcount_inc(&req->work.identity->count); } static void io_ring_ctx_ref_free(struct percpu_ref *ref) @@ -1388,40 +1196,14 @@ static bool req_need_defer(struct io_kiocb *req, u32 seq) return false; } -static void io_put_identity(struct io_uring_task *tctx, struct io_kiocb *req) -{ - if (req->work.identity == &tctx->__identity) - return; - if (refcount_dec_and_test(&req->work.identity->count)) - kfree(req->work.identity); -} - static void io_req_clean_work(struct io_kiocb *req) { if (!(req->flags & REQ_F_WORK_INITIALIZED)) return; - if (req->work.flags & IO_WQ_WORK_MM) - mmdrop(req->work.identity->mm); -#ifdef CONFIG_BLK_CGROUP - if (req->work.flags & IO_WQ_WORK_BLKCG) - css_put(req->work.identity->blkcg_css); -#endif - if (req->work.flags & IO_WQ_WORK_CREDS) - put_cred(req->work.identity->creds); - if (req->work.flags & IO_WQ_WORK_FS) { - struct fs_struct *fs = req->work.identity->fs; - - spin_lock(&req->work.identity->fs->lock); - if (--fs->users) - fs = NULL; - spin_unlock(&req->work.identity->fs->lock); - if (fs) - free_fs_struct(fs); - } - if (req->work.flags & IO_WQ_WORK_FILES) { - put_files_struct(req->work.identity->files); - put_nsproxy(req->work.identity->nsproxy); + if (req->work.creds) { + put_cred(req->work.creds); + req->work.creds = NULL; } if (req->flags & REQ_F_INFLIGHT) { struct io_ring_ctx *ctx = req->ctx; @@ -1437,54 +1219,6 @@ static void io_req_clean_work(struct io_kiocb *req) } req->flags &= ~REQ_F_WORK_INITIALIZED; - req->work.flags &= ~(IO_WQ_WORK_MM | IO_WQ_WORK_BLKCG | IO_WQ_WORK_FS | - IO_WQ_WORK_CREDS | IO_WQ_WORK_FILES); - io_put_identity(req->task->io_uring, req); -} - -/* - * Create a private copy of io_identity, since some fields don't match - * the current context. - */ -static bool io_identity_cow(struct io_kiocb *req) -{ - struct io_uring_task *tctx = current->io_uring; - const struct cred *creds = NULL; - struct io_identity *id; - - if (req->work.flags & IO_WQ_WORK_CREDS) - creds = req->work.identity->creds; - - id = kmemdup(req->work.identity, sizeof(*id), GFP_KERNEL); - if (unlikely(!id)) { - req->work.flags |= IO_WQ_WORK_CANCEL; - return false; - } - - /* - * We can safely just re-init the creds we copied Either the field - * matches the current one, or we haven't grabbed it yet. The only - * exception is ->creds, through registered personalities, so handle - * that one separately. - */ - io_init_identity(id); - if (creds) - id->creds = creds; - - /* add one for this request */ - refcount_inc(&id->count); - - /* drop tctx and req identity references, if needed */ - if (tctx->identity != &tctx->__identity && - refcount_dec_and_test(&tctx->identity->count)) - kfree(tctx->identity); - if (req->work.identity != &tctx->__identity && - refcount_dec_and_test(&req->work.identity->count)) - kfree(req->work.identity); - - req->work.identity = id; - tctx->identity = id; - return true; } static void io_req_track_inflight(struct io_kiocb *req) @@ -1501,79 +1235,6 @@ static void io_req_track_inflight(struct io_kiocb *req) } } -static bool io_grab_identity(struct io_kiocb *req) -{ - const struct io_op_def *def = &io_op_defs[req->opcode]; - struct io_identity *id = req->work.identity; - - if (def->work_flags & IO_WQ_WORK_FSIZE) { - if (id->fsize != rlimit(RLIMIT_FSIZE)) - return false; - req->work.flags |= IO_WQ_WORK_FSIZE; - } -#ifdef CONFIG_BLK_CGROUP - if (!(req->work.flags & IO_WQ_WORK_BLKCG) && - (def->work_flags & IO_WQ_WORK_BLKCG)) { - rcu_read_lock(); - if (id->blkcg_css != blkcg_css()) { - rcu_read_unlock(); - return false; - } - /* - * This should be rare, either the cgroup is dying or the task - * is moving cgroups. Just punt to root for the handful of ios. - */ - if (css_tryget_online(id->blkcg_css)) - req->work.flags |= IO_WQ_WORK_BLKCG; - rcu_read_unlock(); - } -#endif - if (!(req->work.flags & IO_WQ_WORK_CREDS)) { - if (id->creds != current_cred()) - return false; - get_cred(id->creds); - req->work.flags |= IO_WQ_WORK_CREDS; - } -#ifdef CONFIG_AUDIT - if (!uid_eq(current->loginuid, id->loginuid) || - current->sessionid != id->sessionid) - return false; -#endif - if (!(req->work.flags & IO_WQ_WORK_FS) && - (def->work_flags & IO_WQ_WORK_FS)) { - if (current->fs != id->fs) - return false; - spin_lock(&id->fs->lock); - if (!id->fs->in_exec) { - id->fs->users++; - req->work.flags |= IO_WQ_WORK_FS; - } else { - req->work.flags |= IO_WQ_WORK_CANCEL; - } - spin_unlock(¤t->fs->lock); - } - if (!(req->work.flags & IO_WQ_WORK_FILES) && - (def->work_flags & IO_WQ_WORK_FILES) && - !(req->flags & REQ_F_NO_FILE_TABLE)) { - if (id->files != current->files || - id->nsproxy != current->nsproxy) - return false; - atomic_inc(&id->files->count); - get_nsproxy(id->nsproxy); - req->work.flags |= IO_WQ_WORK_FILES; - io_req_track_inflight(req); - } - if (!(req->work.flags & IO_WQ_WORK_MM) && - (def->work_flags & IO_WQ_WORK_MM)) { - if (id->mm != current->mm) - return false; - mmgrab(id->mm); - req->work.flags |= IO_WQ_WORK_MM; - } - - return true; -} - static void io_prep_async_work(struct io_kiocb *req) { const struct io_op_def *def = &io_op_defs[req->opcode]; @@ -1591,17 +1252,8 @@ static void io_prep_async_work(struct io_kiocb *req) if (def->unbound_nonreg_file) req->work.flags |= IO_WQ_WORK_UNBOUND; } - - /* if we fail grabbing identity, we must COW, regrab, and retry */ - if (io_grab_identity(req)) - return; - - if (!io_identity_cow(req)) - return; - - /* can't fail at this point */ - if (!io_grab_identity(req)) - WARN_ON(1); + if (!req->work.creds) + req->work.creds = get_current_cred(); } static void io_prep_async_link(struct io_kiocb *req) @@ -1616,10 +1268,14 @@ static struct io_kiocb *__io_queue_async_work(struct io_kiocb *req) { struct io_ring_ctx *ctx = req->ctx; struct io_kiocb *link = io_prep_linked_timeout(req); + struct io_uring_task *tctx = req->task->io_uring; + + BUG_ON(!tctx); + BUG_ON(!tctx->io_wq); trace_io_uring_queue_async_work(ctx, io_wq_is_hashed(&req->work), req, &req->work, req->flags); - io_wq_enqueue(ctx->io_wq, &req->work); + io_wq_enqueue(tctx->io_wq, &req->work); return link; } @@ -2313,11 +1969,14 @@ static int io_req_task_work_add(struct io_kiocb *req) static void io_req_task_work_add_fallback(struct io_kiocb *req, task_work_func_t cb) { - struct task_struct *tsk = io_wq_get_task(req->ctx->io_wq); + struct io_ring_ctx *ctx = req->ctx; + struct callback_head *head; init_task_work(&req->task_work, cb); - task_work_add(tsk, &req->task_work, TWA_NONE); - wake_up_process(tsk); + do { + head = READ_ONCE(ctx->exit_task_work); + req->task_work.next = head; + } while (cmpxchg(&ctx->exit_task_work, head, &req->task_work) != head); } static void __io_req_task_cancel(struct io_kiocb *req, int error) @@ -2351,15 +2010,11 @@ static void __io_req_task_submit(struct io_kiocb *req) /* ctx stays valid until unlock, even if we drop all ours ctx->refs */ mutex_lock(&ctx->uring_lock); - if (!ctx->sqo_dead && !(current->flags & PF_EXITING) && - !io_sq_thread_acquire_mm_files(ctx, req)) + if (!ctx->sqo_dead && !(current->flags & PF_EXITING) && !current->in_execve) __io_queue_sqe(req); else __io_req_task_cancel(req, -EFAULT); mutex_unlock(&ctx->uring_lock); - - if (ctx->flags & IORING_SETUP_SQPOLL) - io_sq_thread_drop_mm_files(); } static void io_req_task_submit(struct callback_head *cb) @@ -2823,7 +2478,6 @@ static bool io_rw_reissue(struct io_kiocb *req) { #ifdef CONFIG_BLOCK umode_t mode = file_inode(req->file)->i_mode; - int ret; if (!S_ISBLK(mode) && !S_ISREG(mode)) return false; @@ -2839,9 +2493,7 @@ static bool io_rw_reissue(struct io_kiocb *req) lockdep_assert_held(&req->ctx->uring_lock); - ret = io_sq_thread_acquire_mm_files(req->ctx, req); - - if (!ret && io_resubmit_prep(req)) { + if (io_resubmit_prep(req)) { refcount_inc(&req->refs); io_queue_async_work(req); return true; @@ -5946,12 +5598,15 @@ static bool io_cancel_cb(struct io_wq_work *work, void *data) return req->user_data == (unsigned long) data; } -static int io_async_cancel_one(struct io_ring_ctx *ctx, void *sqe_addr) +static int io_async_cancel_one(struct io_uring_task *tctx, void *sqe_addr) { enum io_wq_cancel cancel_ret; int ret = 0; - cancel_ret = io_wq_cancel_cb(ctx->io_wq, io_cancel_cb, sqe_addr, false); + if (!tctx->io_wq) + return -ENOENT; + + cancel_ret = io_wq_cancel_cb(tctx->io_wq, io_cancel_cb, sqe_addr, false); switch (cancel_ret) { case IO_WQ_CANCEL_OK: ret = 0; @@ -5974,7 +5629,8 @@ static void io_async_find_and_cancel(struct io_ring_ctx *ctx, unsigned long flags; int ret; - ret = io_async_cancel_one(ctx, (void *) (unsigned long) sqe_addr); + ret = io_async_cancel_one(req->task->io_uring, + (void *) (unsigned long) sqe_addr); if (ret != -ENOENT) { spin_lock_irqsave(&ctx->completion_lock, flags); goto done; @@ -6563,10 +6219,9 @@ static void __io_queue_sqe(struct io_kiocb *req) const struct cred *old_creds = NULL; int ret; - if ((req->flags & REQ_F_WORK_INITIALIZED) && - (req->work.flags & IO_WQ_WORK_CREDS) && - req->work.identity->creds != current_cred()) - old_creds = override_creds(req->work.identity->creds); + if ((req->flags & REQ_F_WORK_INITIALIZED) && req->work.creds && + req->work.creds != current_cred()) + old_creds = override_creds(req->work.creds); ret = io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_COMPLETE_DEFER); @@ -6684,9 +6339,6 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req, if (unlikely(req->opcode >= IORING_OP_LAST)) return -EINVAL; - if (unlikely(io_sq_thread_acquire_mm_files(ctx, req))) - return -EFAULT; - if (unlikely(!io_check_restriction(ctx, req, sqe_flags))) return -EACCES; @@ -6696,17 +6348,11 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req, id = READ_ONCE(sqe->personality); if (id) { - struct io_identity *iod; - - iod = idr_find(&ctx->personality_idr, id); - if (unlikely(!iod)) - return -EINVAL; - refcount_inc(&iod->count); - __io_req_init_async(req); - get_cred(iod->creds); - req->work.identity = iod; - req->work.flags |= IO_WQ_WORK_CREDS; + req->work.creds = idr_find(&ctx->personality_idr, id); + if (unlikely(!req->work.creds)) + return -EINVAL; + get_cred(req->work.creds); } state = &ctx->submit_state; @@ -7008,71 +6654,97 @@ static void io_sqd_init_new(struct io_sq_data *sqd) io_sqd_update_thread_idle(sqd); } +static bool io_sq_thread_should_stop(struct io_sq_data *sqd) +{ + return test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); +} + +static bool io_sq_thread_should_park(struct io_sq_data *sqd) +{ + return test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); +} + +static void io_sq_thread_parkme(struct io_sq_data *sqd) +{ + for (;;) { + /* + * TASK_PARKED is a special state; we must serialize against + * possible pending wakeups to avoid store-store collisions on + * task->state. + * + * Such a collision might possibly result in the task state + * changin from TASK_PARKED and us failing the + * wait_task_inactive() in kthread_park(). + */ + set_special_state(TASK_PARKED); + if (!test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state)) + break; + + /* + * Thread is going to call schedule(), do not preempt it, + * or the caller of kthread_park() may spend more time in + * wait_task_inactive(). + */ + preempt_disable(); + complete(&sqd->completion); + schedule_preempt_disabled(); + preempt_enable(); + } + __set_current_state(TASK_RUNNING); +} + static int io_sq_thread(void *data) { - struct cgroup_subsys_state *cur_css = NULL; - struct files_struct *old_files = current->files; - struct nsproxy *old_nsproxy = current->nsproxy; - const struct cred *old_cred = NULL; struct io_sq_data *sqd = data; struct io_ring_ctx *ctx; unsigned long timeout = 0; + char buf[TASK_COMM_LEN]; DEFINE_WAIT(wait); - task_lock(current); - current->files = NULL; - current->nsproxy = NULL; - task_unlock(current); + sprintf(buf, "iou-sqp-%d", sqd->task_pid); + set_task_comm(current, buf); + sqd->thread = current; + current->pf_io_worker = NULL; + + if (sqd->sq_cpu != -1) + set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu)); + else + set_cpus_allowed_ptr(current, cpu_online_mask); + current->flags |= PF_NO_SETAFFINITY; + + complete(&sqd->completion); - while (!kthread_should_stop()) { + wait_for_completion(&sqd->startup); + + while (!io_sq_thread_should_stop(sqd)) { int ret; bool cap_entries, sqt_spin, needs_sched; /* * Any changes to the sqd lists are synchronized through the - * kthread parking. This synchronizes the thread vs users, + * thread parking. This synchronizes the thread vs users, * the users are synchronized on the sqd->ctx_lock. */ - if (kthread_should_park()) { - kthread_parkme(); - /* - * When sq thread is unparked, in case the previous park operation - * comes from io_put_sq_data(), which means that sq thread is going - * to be stopped, so here needs to have a check. - */ - if (kthread_should_stop()) - break; + if (io_sq_thread_should_park(sqd)) { + io_sq_thread_parkme(sqd); + continue; } - if (unlikely(!list_empty(&sqd->ctx_new_list))) { io_sqd_init_new(sqd); timeout = jiffies + sqd->sq_thread_idle; } - + if (fatal_signal_pending(current)) + break; sqt_spin = false; cap_entries = !list_is_singular(&sqd->ctx_list); list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { - if (current->cred != ctx->creds) { - if (old_cred) - revert_creds(old_cred); - old_cred = override_creds(ctx->creds); - } - io_sq_thread_associate_blkcg(ctx, &cur_css); -#ifdef CONFIG_AUDIT - current->loginuid = ctx->loginuid; - current->sessionid = ctx->sessionid; -#endif - ret = __io_sq_thread(ctx, cap_entries); if (!sqt_spin && (ret > 0 || !list_empty(&ctx->iopoll_list))) sqt_spin = true; - - io_sq_thread_drop_mm_files(); } if (sqt_spin || !time_after(jiffies, timeout)) { io_run_task_work(); - io_sq_thread_drop_mm_files(); cond_resched(); if (sqt_spin) timeout = jiffies + sqd->sq_thread_idle; @@ -7093,7 +6765,7 @@ static int io_sq_thread(void *data) } } - if (needs_sched && !kthread_should_park()) { + if (needs_sched && !io_sq_thread_should_park(sqd)) { list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) io_ring_set_wakeup_flag(ctx); @@ -7106,22 +6778,25 @@ static int io_sq_thread(void *data) timeout = jiffies + sqd->sq_thread_idle; } - io_run_task_work(); - io_sq_thread_drop_mm_files(); - - if (cur_css) - io_sq_thread_unassociate_blkcg(); - if (old_cred) - revert_creds(old_cred); + list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) + io_uring_cancel_sqpoll(ctx); - task_lock(current); - current->files = old_files; - current->nsproxy = old_nsproxy; - task_unlock(current); + io_run_task_work(); - kthread_parkme(); + /* + * Clear thread under lock so that concurrent parks work correctly + */ + complete_all(&sqd->completion); + mutex_lock(&sqd->lock); + sqd->thread = NULL; + list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { + ctx->sqo_exec = 1; + io_ring_set_wakeup_flag(ctx); + } + mutex_unlock(&sqd->lock); - return 0; + complete(&sqd->exited); + do_exit(0); } struct io_wait_queue { @@ -7413,20 +7088,74 @@ static int io_sqe_files_unregister(struct io_ring_ctx *ctx) return 0; } +static void io_sq_thread_unpark(struct io_sq_data *sqd) + __releases(&sqd->lock) +{ + if (!sqd->thread) + return; + if (sqd->thread == current) + return; + clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); + wake_up_state(sqd->thread, TASK_PARKED); + mutex_unlock(&sqd->lock); +} + +static bool io_sq_thread_park(struct io_sq_data *sqd) + __acquires(&sqd->lock) +{ + if (sqd->thread == current) + return true; + mutex_lock(&sqd->lock); + if (!sqd->thread) { + mutex_unlock(&sqd->lock); + return false; + } + set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); + wake_up_process(sqd->thread); + wait_for_completion(&sqd->completion); + return true; +} + +static void io_sq_thread_stop(struct io_sq_data *sqd) +{ + if (!sqd->thread) + return; + + set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); + WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state)); + wake_up_process(sqd->thread); + wait_for_completion(&sqd->exited); +} + static void io_put_sq_data(struct io_sq_data *sqd) { if (refcount_dec_and_test(&sqd->refs)) { - /* - * The park is a bit of a work-around, without it we get - * warning spews on shutdown with SQPOLL set and affinity - * set to a single CPU. - */ + io_sq_thread_stop(sqd); + kfree(sqd); + } +} + +static void io_sq_thread_finish(struct io_ring_ctx *ctx) +{ + struct io_sq_data *sqd = ctx->sq_data; + + if (sqd) { + complete(&sqd->startup); if (sqd->thread) { - kthread_park(sqd->thread); - kthread_stop(sqd->thread); + wait_for_completion(&ctx->sq_thread_comp); + io_sq_thread_park(sqd); } - kfree(sqd); + mutex_lock(&sqd->ctx_lock); + list_del(&ctx->sqd_list); + io_sqd_update_thread_idle(sqd); + mutex_unlock(&sqd->ctx_lock); + + if (sqd->thread) + io_sq_thread_unpark(sqd); + + io_put_sq_data(sqd); + ctx->sq_data = NULL; } } @@ -7473,68 +7202,12 @@ static struct io_sq_data *io_get_sq_data(struct io_uring_params *p) mutex_init(&sqd->ctx_lock); mutex_init(&sqd->lock); init_waitqueue_head(&sqd->wait); + init_completion(&sqd->startup); + init_completion(&sqd->completion); + init_completion(&sqd->exited); return sqd; } -static void io_sq_thread_unpark(struct io_sq_data *sqd) - __releases(&sqd->lock) -{ - if (!sqd->thread) - return; - kthread_unpark(sqd->thread); - mutex_unlock(&sqd->lock); -} - -static void io_sq_thread_park(struct io_sq_data *sqd) - __acquires(&sqd->lock) -{ - if (!sqd->thread) - return; - mutex_lock(&sqd->lock); - kthread_park(sqd->thread); -} - -static void io_sq_thread_stop(struct io_ring_ctx *ctx) -{ - struct io_sq_data *sqd = ctx->sq_data; - - if (sqd) { - if (sqd->thread) { - /* - * We may arrive here from the error branch in - * io_sq_offload_create() where the kthread is created - * without being waked up, thus wake it up now to make - * sure the wait will complete. - */ - wake_up_process(sqd->thread); - wait_for_completion(&ctx->sq_thread_comp); - - io_sq_thread_park(sqd); - } - - mutex_lock(&sqd->ctx_lock); - list_del(&ctx->sqd_list); - io_sqd_update_thread_idle(sqd); - mutex_unlock(&sqd->ctx_lock); - - if (sqd->thread) - io_sq_thread_unpark(sqd); - - io_put_sq_data(sqd); - ctx->sq_data = NULL; - } -} - -static void io_finish_async(struct io_ring_ctx *ctx) -{ - io_sq_thread_stop(ctx); - - if (ctx->io_wq) { - io_wq_destroy(ctx->io_wq); - ctx->io_wq = NULL; - } -} - #if defined(CONFIG_UNIX) /* * Ensure the UNIX gc is aware of our file set, so we are certain that @@ -7561,7 +7234,7 @@ static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset) skb->sk = sk; nr_files = 0; - fpl->user = get_uid(ctx->user); + fpl->user = get_uid(current_user()); for (i = 0; i < nr; i++) { struct file *file = io_file_from_index(ctx, i + offset); @@ -8093,54 +7766,34 @@ static struct io_wq_work *io_free_work(struct io_wq_work *work) return req ? &req->work : NULL; } -static int io_init_wq_offload(struct io_ring_ctx *ctx, - struct io_uring_params *p) +static struct io_wq *io_init_wq_offload(struct io_ring_ctx *ctx) { + struct io_wq_hash *hash; struct io_wq_data data; - struct fd f; - struct io_ring_ctx *ctx_attach; unsigned int concurrency; - int ret = 0; - - data.user = ctx->user; - data.free_work = io_free_work; - data.do_work = io_wq_submit_work; - - if (!(p->flags & IORING_SETUP_ATTACH_WQ)) { - /* Do QD, or 4 * CPUS, whatever is smallest */ - concurrency = min(ctx->sq_entries, 4 * num_online_cpus()); - ctx->io_wq = io_wq_create(concurrency, &data); - if (IS_ERR(ctx->io_wq)) { - ret = PTR_ERR(ctx->io_wq); - ctx->io_wq = NULL; - } - return ret; + hash = ctx->hash_map; + if (!hash) { + hash = kzalloc(sizeof(*hash), GFP_KERNEL); + if (!hash) + return ERR_PTR(-ENOMEM); + refcount_set(&hash->refs, 1); + init_waitqueue_head(&hash->wait); + ctx->hash_map = hash; } - f = fdget(p->wq_fd); - if (!f.file) - return -EBADF; - - if (f.file->f_op != &io_uring_fops) { - ret = -EINVAL; - goto out_fput; - } + data.hash = hash; + data.free_work = io_free_work; + data.do_work = io_wq_submit_work; - ctx_attach = f.file->private_data; - /* @io_wq is protected by holding the fd */ - if (!io_wq_get(ctx_attach->io_wq, &data)) { - ret = -EINVAL; - goto out_fput; - } + /* Do QD, or 4 * CPUS, whatever is smallest */ + concurrency = min(ctx->sq_entries, 4 * num_online_cpus()); - ctx->io_wq = ctx_attach->io_wq; -out_fput: - fdput(f); - return ret; + return io_wq_create(concurrency, &data); } -static int io_uring_alloc_task_context(struct task_struct *task) +static int io_uring_alloc_task_context(struct task_struct *task, + struct io_ring_ctx *ctx) { struct io_uring_task *tctx; int ret; @@ -8155,13 +7808,19 @@ static int io_uring_alloc_task_context(struct task_struct *task) return ret; } + tctx->io_wq = io_init_wq_offload(ctx); + if (IS_ERR(tctx->io_wq)) { + ret = PTR_ERR(tctx->io_wq); + percpu_counter_destroy(&tctx->inflight); + kfree(tctx); + return ret; + } + xa_init(&tctx->xa); init_waitqueue_head(&tctx->wait); tctx->last = NULL; atomic_set(&tctx->in_idle, 0); tctx->sqpoll = false; - io_init_identity(&tctx->__identity); - tctx->identity = &tctx->__identity; task->io_uring = tctx; spin_lock_init(&tctx->task_lock); INIT_WQ_LIST(&tctx->task_list); @@ -8175,19 +7834,49 @@ void __io_uring_free(struct task_struct *tsk) struct io_uring_task *tctx = tsk->io_uring; WARN_ON_ONCE(!xa_empty(&tctx->xa)); - WARN_ON_ONCE(refcount_read(&tctx->identity->count) != 1); - if (tctx->identity != &tctx->__identity) - kfree(tctx->identity); percpu_counter_destroy(&tctx->inflight); kfree(tctx); tsk->io_uring = NULL; } +static int io_sq_thread_fork(struct io_sq_data *sqd, struct io_ring_ctx *ctx) +{ + int ret; + + clear_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); + reinit_completion(&sqd->completion); + ctx->sqo_dead = ctx->sqo_exec = 0; + sqd->task_pid = current->pid; + current->flags |= PF_IO_WORKER; + ret = io_wq_fork_thread(io_sq_thread, sqd); + current->flags &= ~PF_IO_WORKER; + if (ret < 0) { + sqd->thread = NULL; + return ret; + } + wait_for_completion(&sqd->completion); + return io_uring_alloc_task_context(sqd->thread, ctx); +} + static int io_sq_offload_create(struct io_ring_ctx *ctx, struct io_uring_params *p) { int ret; + /* Retain compatibility with failing for an invalid attach attempt */ + if ((ctx->flags & (IORING_SETUP_ATTACH_WQ | IORING_SETUP_SQPOLL)) == + IORING_SETUP_ATTACH_WQ) { + struct fd f; + + f = fdget(p->wq_fd); + if (!f.file) + return -ENXIO; + if (f.file->f_op != &io_uring_fops) { + fdput(f); + return -EINVAL; + } + fdput(f); + } if (ctx->flags & IORING_SETUP_SQPOLL) { struct io_sq_data *sqd; @@ -8213,7 +7902,7 @@ static int io_sq_offload_create(struct io_ring_ctx *ctx, ctx->sq_thread_idle = HZ; if (sqd->thread) - goto done; + return 0; if (p->flags & IORING_SETUP_SQ_AFF) { int cpu = p->sq_thread_cpu; @@ -8224,18 +7913,21 @@ static int io_sq_offload_create(struct io_ring_ctx *ctx, if (!cpu_online(cpu)) goto err; - sqd->thread = kthread_create_on_cpu(io_sq_thread, sqd, - cpu, "io_uring-sq"); + sqd->sq_cpu = cpu; } else { - sqd->thread = kthread_create(io_sq_thread, sqd, - "io_uring-sq"); + sqd->sq_cpu = -1; } - if (IS_ERR(sqd->thread)) { - ret = PTR_ERR(sqd->thread); + + sqd->task_pid = current->pid; + current->flags |= PF_IO_WORKER; + ret = io_wq_fork_thread(io_sq_thread, sqd); + current->flags &= ~PF_IO_WORKER; + if (ret < 0) { sqd->thread = NULL; goto err; } - ret = io_uring_alloc_task_context(sqd->thread); + wait_for_completion(&sqd->completion); + ret = io_uring_alloc_task_context(sqd->thread, ctx); if (ret) goto err; } else if (p->flags & IORING_SETUP_SQ_AFF) { @@ -8244,14 +7936,9 @@ static int io_sq_offload_create(struct io_ring_ctx *ctx, goto err; } -done: - ret = io_init_wq_offload(ctx, p); - if (ret) - goto err; - return 0; err: - io_finish_async(ctx); + io_sq_thread_finish(ctx); return ret; } @@ -8259,8 +7946,8 @@ static void io_sq_offload_start(struct io_ring_ctx *ctx) { struct io_sq_data *sqd = ctx->sq_data; - if ((ctx->flags & IORING_SETUP_SQPOLL) && sqd->thread) - wake_up_process(sqd->thread); + if (ctx->flags & IORING_SETUP_SQPOLL) + complete(&sqd->startup); } static inline void __io_unaccount_mem(struct user_struct *user, @@ -8290,7 +7977,7 @@ static inline int __io_account_mem(struct user_struct *user, static void io_unaccount_mem(struct io_ring_ctx *ctx, unsigned long nr_pages) { - if (ctx->limit_mem) + if (ctx->user) __io_unaccount_mem(ctx->user, nr_pages); if (ctx->mm_account) @@ -8301,7 +7988,7 @@ static int io_account_mem(struct io_ring_ctx *ctx, unsigned long nr_pages) { int ret; - if (ctx->limit_mem) { + if (ctx->user) { ret = __io_account_mem(ctx->user, nr_pages); if (ret) return ret; @@ -8730,21 +8417,14 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx) mutex_lock(&ctx->uring_lock); mutex_unlock(&ctx->uring_lock); - io_finish_async(ctx); + io_sq_thread_finish(ctx); io_sqe_buffers_unregister(ctx); - if (ctx->sqo_task) { - put_task_struct(ctx->sqo_task); - ctx->sqo_task = NULL; + if (ctx->mm_account) { mmdrop(ctx->mm_account); ctx->mm_account = NULL; } -#ifdef CONFIG_BLK_CGROUP - if (ctx->sqo_blkcg_css) - css_put(ctx->sqo_blkcg_css); -#endif - mutex_lock(&ctx->uring_lock); io_sqe_files_unregister(ctx); mutex_unlock(&ctx->uring_lock); @@ -8764,8 +8444,9 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx) percpu_ref_exit(&ctx->refs); free_uid(ctx->user); - put_cred(ctx->creds); io_req_caches_free(ctx, NULL); + if (ctx->hash_map) + io_wq_put_hash(ctx->hash_map); kfree(ctx->cancel_hash); kfree(ctx); } @@ -8812,13 +8493,11 @@ static int io_uring_fasync(int fd, struct file *file, int on) static int io_unregister_personality(struct io_ring_ctx *ctx, unsigned id) { - struct io_identity *iod; + const struct cred *creds; - iod = idr_remove(&ctx->personality_idr, id); - if (iod) { - put_cred(iod->creds); - if (refcount_dec_and_test(&iod->count)) - kfree(iod); + creds = idr_remove(&ctx->personality_idr, id); + if (creds) { + put_cred(creds); return 0; } @@ -8833,6 +8512,28 @@ static int io_remove_personalities(int id, void *p, void *data) return 0; } +static void io_run_ctx_fallback(struct io_ring_ctx *ctx) +{ + struct callback_head *work, *head, *next; + + do { + do { + head = NULL; + work = READ_ONCE(ctx->exit_task_work); + } while (cmpxchg(&ctx->exit_task_work, work, head) != work); + + if (!work) + break; + + do { + next = work->next; + work->func(work); + work = next; + cond_resched(); + } while (work); + } while (1); +} + static void io_ring_exit_work(struct work_struct *work) { struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx, @@ -8846,17 +8547,11 @@ static void io_ring_exit_work(struct work_struct *work) */ do { io_uring_try_cancel_requests(ctx, NULL, NULL); + io_run_ctx_fallback(ctx); } while (!wait_for_completion_timeout(&ctx->ref_comp, HZ/20)); io_ring_ctx_free(ctx); } -static bool io_cancel_ctx_cb(struct io_wq_work *work, void *data) -{ - struct io_kiocb *req = container_of(work, struct io_kiocb, work); - - return req->ctx == data; -} - static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) { mutex_lock(&ctx->uring_lock); @@ -8875,9 +8570,6 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) io_kill_timeouts(ctx, NULL, NULL); io_poll_remove_all(ctx, NULL, NULL); - if (ctx->io_wq) - io_wq_cancel_cb(ctx->io_wq, io_cancel_ctx_cb, ctx, true); - /* if we failed setting up the ctx, we might not have any rings */ io_iopoll_try_reap_events(ctx); @@ -8956,13 +8648,14 @@ static void io_uring_try_cancel_requests(struct io_ring_ctx *ctx, struct files_struct *files) { struct io_task_cancel cancel = { .task = task, .files = files, }; + struct io_uring_task *tctx = current->io_uring; while (1) { enum io_wq_cancel cret; bool ret = false; - if (ctx->io_wq) { - cret = io_wq_cancel_cb(ctx->io_wq, io_cancel_task_cb, + if (tctx && tctx->io_wq) { + cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_task_cb, &cancel, true); ret |= (cret != IO_WQ_CANCEL_NOTFOUND); } @@ -9045,12 +8738,15 @@ static void io_uring_cancel_task_requests(struct io_ring_ctx *ctx, struct files_struct *files) { struct task_struct *task = current; + bool did_park = false; if ((ctx->flags & IORING_SETUP_SQPOLL) && ctx->sq_data) { io_disable_sqo_submit(ctx); - task = ctx->sq_data->thread; - atomic_inc(&task->io_uring->in_idle); - io_sq_thread_park(ctx->sq_data); + did_park = io_sq_thread_park(ctx->sq_data); + if (did_park) { + task = ctx->sq_data->thread; + atomic_inc(&task->io_uring->in_idle); + } } io_cancel_defer_files(ctx, task, files); @@ -9059,7 +8755,7 @@ static void io_uring_cancel_task_requests(struct io_ring_ctx *ctx, if (!files) io_uring_try_cancel_requests(ctx, task, NULL); - if ((ctx->flags & IORING_SETUP_SQPOLL) && ctx->sq_data) { + if (did_park) { atomic_dec(&task->io_uring->in_idle); io_sq_thread_unpark(ctx->sq_data); } @@ -9074,7 +8770,7 @@ static int io_uring_add_task_file(struct io_ring_ctx *ctx, struct file *file) int ret; if (unlikely(!tctx)) { - ret = io_uring_alloc_task_context(current); + ret = io_uring_alloc_task_context(current, ctx); if (unlikely(ret)) return ret; tctx = current->io_uring; @@ -9144,8 +8840,13 @@ void __io_uring_files_cancel(struct files_struct *files) io_uring_cancel_task_requests(file->private_data, files); atomic_dec(&tctx->in_idle); - if (files) + if (files) { io_uring_remove_task_files(tctx); + if (tctx->io_wq) { + io_wq_put(tctx->io_wq); + tctx->io_wq = NULL; + } + } } static s64 tctx_inflight(struct io_uring_task *tctx) @@ -9155,14 +8856,17 @@ static s64 tctx_inflight(struct io_uring_task *tctx) static void io_uring_cancel_sqpoll(struct io_ring_ctx *ctx) { + struct io_sq_data *sqd = ctx->sq_data; struct io_uring_task *tctx; s64 inflight; DEFINE_WAIT(wait); - if (!ctx->sq_data) + if (!sqd) return; - tctx = ctx->sq_data->thread->io_uring; io_disable_sqo_submit(ctx); + if (!io_sq_thread_park(sqd)) + return; + tctx = ctx->sq_data->thread->io_uring; atomic_inc(&tctx->in_idle); do { @@ -9183,6 +8887,7 @@ static void io_uring_cancel_sqpoll(struct io_ring_ctx *ctx) finish_wait(&tctx->wait, &wait); } while (1); atomic_dec(&tctx->in_idle); + io_sq_thread_unpark(sqd); } /* @@ -9236,11 +8941,17 @@ static int io_uring_flush(struct file *file, void *data) struct io_uring_task *tctx = current->io_uring; struct io_ring_ctx *ctx = file->private_data; + /* Ignore helper thread files exit */ + if (current->flags & PF_IO_WORKER) + return 0; + if (fatal_signal_pending(current) || (current->flags & PF_EXITING)) { io_uring_cancel_task_requests(ctx, NULL); io_req_caches_free(ctx, current); } + io_run_ctx_fallback(ctx); + if (!tctx) return 0; @@ -9439,6 +9150,12 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, if (ctx->flags & IORING_SETUP_SQPOLL) { io_cqring_overflow_flush(ctx, false, NULL, NULL); + if (unlikely(ctx->sqo_exec)) { + ret = io_sq_thread_fork(ctx->sq_data, ctx); + if (ret) + goto out; + ctx->sqo_exec = 0; + } ret = -EOWNERDEAD; if (unlikely(ctx->sqo_dead)) goto out; @@ -9495,8 +9212,7 @@ out_fput: #ifdef CONFIG_PROC_FS static int io_uring_show_cred(int id, void *p, void *data) { - struct io_identity *iod = p; - const struct cred *cred = iod->creds; + const struct cred *cred = p; struct seq_file *m = data; struct user_namespace *uns = seq_user_ns(m); struct group_info *gi; @@ -9541,8 +9257,11 @@ static void __io_uring_show_fdinfo(struct io_ring_ctx *ctx, struct seq_file *m) */ has_lock = mutex_trylock(&ctx->uring_lock); - if (has_lock && (ctx->flags & IORING_SETUP_SQPOLL)) + if (has_lock && (ctx->flags & IORING_SETUP_SQPOLL)) { sq = ctx->sq_data; + if (!sq->thread) + sq = NULL; + } seq_printf(m, "SqThread:\t%d\n", sq ? task_pid_nr(sq->thread) : -1); seq_printf(m, "SqThreadCpu:\t%d\n", sq ? task_cpu(sq->thread) : -1); @@ -9702,7 +9421,6 @@ static struct file *io_uring_get_file(struct io_ring_ctx *ctx) static int io_uring_create(unsigned entries, struct io_uring_params *p, struct io_uring_params __user *params) { - struct user_struct *user = NULL; struct io_ring_ctx *ctx; struct file *file; int ret; @@ -9744,22 +9462,13 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p, p->cq_entries = 2 * p->sq_entries; } - user = get_uid(current_user()); - ctx = io_ring_ctx_alloc(p); - if (!ctx) { - free_uid(user); + if (!ctx) return -ENOMEM; - } ctx->compat = in_compat_syscall(); - ctx->limit_mem = !capable(CAP_IPC_LOCK); - ctx->user = user; - ctx->creds = get_current_cred(); -#ifdef CONFIG_AUDIT - ctx->loginuid = current->loginuid; - ctx->sessionid = current->sessionid; -#endif - ctx->sqo_task = get_task_struct(current); + if (!capable(CAP_IPC_LOCK)) + ctx->user = get_uid(current_user()); + ctx->sqo_task = current; /* * This is just grabbed for accounting purposes. When a process exits, @@ -9770,24 +9479,6 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p, mmgrab(current->mm); ctx->mm_account = current->mm; -#ifdef CONFIG_BLK_CGROUP - /* - * The sq thread will belong to the original cgroup it was inited in. - * If the cgroup goes offline (e.g. disabling the io controller), then - * issued bios will be associated with the closest cgroup later in the - * block layer. - */ - rcu_read_lock(); - ctx->sqo_blkcg_css = blkcg_css(); - ret = css_tryget_online(ctx->sqo_blkcg_css); - rcu_read_unlock(); - if (!ret) { - /* don't init against a dying cgroup, have the user try again */ - ctx->sqo_blkcg_css = NULL; - ret = -ENODEV; - goto err; - } -#endif ret = io_allocate_scq_urings(ctx, p); if (ret) goto err; @@ -9821,7 +9512,7 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p, IORING_FEAT_SUBMIT_STABLE | IORING_FEAT_RW_CUR_POS | IORING_FEAT_CUR_PERSONALITY | IORING_FEAT_FAST_POLL | IORING_FEAT_POLL_32BITS | IORING_FEAT_SQPOLL_NONFIXED | - IORING_FEAT_EXT_ARG; + IORING_FEAT_EXT_ARG | IORING_FEAT_NATIVE_WORKERS; if (copy_to_user(params, p, sizeof(*p))) { ret = -EFAULT; @@ -9927,21 +9618,15 @@ out: static int io_register_personality(struct io_ring_ctx *ctx) { - struct io_identity *id; + const struct cred *creds; int ret; - id = kmalloc(sizeof(*id), GFP_KERNEL); - if (unlikely(!id)) - return -ENOMEM; - - io_init_identity(id); - id->creds = get_current_cred(); + creds = get_current_cred(); - ret = idr_alloc_cyclic(&ctx->personality_idr, id, 1, USHRT_MAX, GFP_KERNEL); - if (ret < 0) { - put_cred(id->creds); - kfree(id); - } + ret = idr_alloc_cyclic(&ctx->personality_idr, (void *) creds, 1, + USHRT_MAX, GFP_KERNEL); + if (ret < 0) + put_cred(creds); return ret; } diff --git a/fs/proc/self.c b/fs/proc/self.c index a4012154e109..72cd69bcaf4a 100644 --- a/fs/proc/self.c +++ b/fs/proc/self.c @@ -16,13 +16,6 @@ static const char *proc_self_get_link(struct dentry *dentry, pid_t tgid = task_tgid_nr_ns(current, ns); char *name; - /* - * Not currently supported. Once we can inherit all of struct pid, - * we can allow this. - */ - if (current->flags & PF_IO_WORKER) - return ERR_PTR(-EOPNOTSUPP); - if (!tgid) return ERR_PTR(-ENOENT); /* max length of unsigned int in decimal + NULL term */ diff --git a/fs/proc/thread_self.c b/fs/proc/thread_self.c index d56681d86d28..a553273fbd41 100644 --- a/fs/proc/thread_self.c +++ b/fs/proc/thread_self.c @@ -17,13 +17,6 @@ static const char *proc_thread_self_get_link(struct dentry *dentry, pid_t pid = task_pid_nr_ns(current, ns); char *name; - /* - * Not currently supported. Once we can inherit all of struct pid, - * we can allow this. - */ - if (current->flags & PF_IO_WORKER) - return ERR_PTR(-EOPNOTSUPP); - if (!pid) return ERR_PTR(-ENOENT); name = kmalloc(10 + 6 + 10 + 1, dentry ? GFP_KERNEL : GFP_ATOMIC); diff --git a/include/linux/io_uring.h b/include/linux/io_uring.h index 2eb6d19de336..51ede771cd99 100644 --- a/include/linux/io_uring.h +++ b/include/linux/io_uring.h @@ -5,23 +5,6 @@ #include <linux/sched.h> #include <linux/xarray.h> -struct io_identity { - struct files_struct *files; - struct mm_struct *mm; -#ifdef CONFIG_BLK_CGROUP - struct cgroup_subsys_state *blkcg_css; -#endif - const struct cred *creds; - struct nsproxy *nsproxy; - struct fs_struct *fs; - unsigned long fsize; -#ifdef CONFIG_AUDIT - kuid_t loginuid; - unsigned int sessionid; -#endif - refcount_t count; -}; - struct io_wq_work_node { struct io_wq_work_node *next; }; @@ -36,9 +19,8 @@ struct io_uring_task { struct xarray xa; struct wait_queue_head wait; struct file *last; + void *io_wq; struct percpu_counter inflight; - struct io_identity __identity; - struct io_identity *identity; atomic_t in_idle; bool sqpoll; @@ -61,7 +43,7 @@ static inline void io_uring_task_cancel(void) } static inline void io_uring_files_cancel(struct files_struct *files) { - if (current->io_uring && !xa_empty(¤t->io_uring->xa)) + if (current->io_uring) __io_uring_files_cancel(files); } static inline void io_uring_free(struct task_struct *tsk) diff --git a/include/linux/net.h b/include/linux/net.h index 9e2324efc26a..ba736b457a06 100644 --- a/include/linux/net.h +++ b/include/linux/net.h @@ -42,8 +42,6 @@ struct net; #define SOCK_PASSCRED 3 #define SOCK_PASSSEC 4 -#define PROTO_CMSG_DATA_ONLY 0x0001 - #ifndef ARCH_HAS_SOCKET_TYPES /** * enum sock_type - Socket types @@ -138,7 +136,6 @@ typedef int (*sk_read_actor_t)(read_descriptor_t *, struct sk_buff *, struct proto_ops { int family; - unsigned int flags; struct module *owner; int (*release) (struct socket *sock); int (*bind) (struct socket *sock, diff --git a/include/linux/sched.h b/include/linux/sched.h index 26f499810dfa..ef00bb22164c 100644 --- a/include/linux/sched.h +++ b/include/linux/sched.h @@ -895,6 +895,9 @@ struct task_struct { /* CLONE_CHILD_CLEARTID: */ int __user *clear_child_tid; + /* PF_IO_WORKER */ + void *pf_io_worker; + u64 utime; u64 stime; #ifdef CONFIG_ARCH_HAS_SCALED_CPUTIME diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index ac4e1738a9af..2514eb6b1cf2 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -262,6 +262,7 @@ struct io_uring_params { #define IORING_FEAT_POLL_32BITS (1U << 6) #define IORING_FEAT_SQPOLL_NONFIXED (1U << 7) #define IORING_FEAT_EXT_ARG (1U << 8) +#define IORING_FEAT_NATIVE_WORKERS (1U << 9) /* * io_uring_register(2) opcodes and arguments diff --git a/kernel/ptrace.c b/kernel/ptrace.c index 61db50f7ca86..821cf1723814 100644 --- a/kernel/ptrace.c +++ b/kernel/ptrace.c @@ -375,7 +375,7 @@ static int ptrace_attach(struct task_struct *task, long request, audit_ptrace(task); retval = -EPERM; - if (unlikely(task->flags & PF_KTHREAD)) + if (unlikely(task->flags & (PF_KTHREAD | PF_IO_WORKER))) goto out; if (same_thread_group(task, current)) goto out; diff --git a/kernel/signal.c b/kernel/signal.c index 5ad8566534e7..ba4d1ef39a9e 100644 --- a/kernel/signal.c +++ b/kernel/signal.c @@ -91,7 +91,7 @@ static bool sig_task_ignored(struct task_struct *t, int sig, bool force) return true; /* Only allow kernel generated signals to this kthread */ - if (unlikely((t->flags & PF_KTHREAD) && + if (unlikely((t->flags & (PF_KTHREAD | PF_IO_WORKER)) && (handler == SIG_KTHREAD_KERNEL) && !force)) return true; @@ -1096,7 +1096,7 @@ static int __send_signal(int sig, struct kernel_siginfo *info, struct task_struc /* * Skip useless siginfo allocation for SIGKILL and kernel threads. */ - if ((sig == SIGKILL) || (t->flags & PF_KTHREAD)) + if ((sig == SIGKILL) || (t->flags & (PF_KTHREAD | PF_IO_WORKER))) goto out_set; /* diff --git a/net/ipv4/af_inet.c b/net/ipv4/af_inet.c index a02ce89b56b5..1355e6c0d567 100644 --- a/net/ipv4/af_inet.c +++ b/net/ipv4/af_inet.c @@ -1021,7 +1021,6 @@ static int inet_compat_ioctl(struct socket *sock, unsigned int cmd, unsigned lon const struct proto_ops inet_stream_ops = { .family = PF_INET, - .flags = PROTO_CMSG_DATA_ONLY, .owner = THIS_MODULE, .release = inet_release, .bind = inet_bind, diff --git a/net/ipv6/af_inet6.c b/net/ipv6/af_inet6.c index 1fb75f01756c..802f5111805a 100644 --- a/net/ipv6/af_inet6.c +++ b/net/ipv6/af_inet6.c @@ -665,7 +665,6 @@ int inet6_recvmsg(struct socket *sock, struct msghdr *msg, size_t size, const struct proto_ops inet6_stream_ops = { .family = PF_INET6, - .flags = PROTO_CMSG_DATA_ONLY, .owner = THIS_MODULE, .release = inet6_release, .bind = inet6_bind, diff --git a/net/socket.c b/net/socket.c index 23c7842389de..84a8049c2b09 100644 --- a/net/socket.c +++ b/net/socket.c @@ -2413,10 +2413,6 @@ static int ___sys_sendmsg(struct socket *sock, struct user_msghdr __user *msg, long __sys_sendmsg_sock(struct socket *sock, struct msghdr *msg, unsigned int flags) { - /* disallow ancillary data requests from this path */ - if (msg->msg_control || msg->msg_controllen) - return -EINVAL; - return ____sys_sendmsg(sock, msg, flags, NULL, 0); } @@ -2625,12 +2621,6 @@ long __sys_recvmsg_sock(struct socket *sock, struct msghdr *msg, struct user_msghdr __user *umsg, struct sockaddr __user *uaddr, unsigned int flags) { - if (msg->msg_control || msg->msg_controllen) { - /* disallow ancillary data reqs unless cmsg is plain data */ - if (!(sock->ops->flags & PROTO_CMSG_DATA_ONLY)) - return -EINVAL; - } - return ____sys_recvmsg(sock, msg, umsg, uaddr, flags, 0); } |