diff options
Diffstat (limited to 'fs/io-wq.c')
-rw-r--r-- | fs/io-wq.c | 261 |
1 files changed, 121 insertions, 140 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c index 44e20248805a..28868eb4cd09 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -16,6 +16,7 @@ #include <linux/rculist_nulls.h> #include <linux/cpu.h> #include <linux/tracehook.h> +#include <linux/freezer.h> #include "../kernel/sched/sched.h" #include "io-wq.h" @@ -52,9 +53,6 @@ struct io_worker { struct io_wq_work *cur_work; spinlock_t lock; - const struct cred *cur_creds; - const struct cred *saved_creds; - struct completion ref_done; struct rcu_head rcu; @@ -117,7 +115,10 @@ struct io_wq { struct io_wq_hash *hash; refcount_t refs; - struct completion done; + struct completion exited; + + atomic_t worker_refs; + struct completion worker_done; struct hlist_node cpuhp_node; @@ -126,6 +127,17 @@ struct io_wq { static enum cpuhp_state io_wq_online; +struct io_cb_cancel_data { + work_cancel_fn *fn; + void *data; + int nr_running; + int nr_pending; + bool cancel_all; +}; + +static void io_wqe_cancel_pending_work(struct io_wqe *wqe, + struct io_cb_cancel_data *match); + static bool io_worker_get(struct io_worker *worker) { return refcount_inc_not_zero(&worker->ref); @@ -175,11 +187,6 @@ static void io_worker_exit(struct io_worker *worker) worker->flags = 0; preempt_enable(); - if (worker->saved_creds) { - revert_creds(worker->saved_creds); - worker->cur_creds = worker->saved_creds = NULL; - } - raw_spin_lock_irq(&wqe->lock); if (flags & IO_WORKER_F_FREE) hlist_nulls_del_rcu(&worker->nulls_node); @@ -188,7 +195,9 @@ static void io_worker_exit(struct io_worker *worker) raw_spin_unlock_irq(&wqe->lock); kfree_rcu(worker, rcu); - io_wq_put(wqe->wq); + if (atomic_dec_and_test(&wqe->wq->worker_refs)) + complete(&wqe->wq->worker_done); + do_exit(0); } static inline bool io_wqe_run_queue(struct io_wqe *wqe) @@ -263,12 +272,6 @@ static void io_wqe_dec_running(struct io_worker *worker) io_wqe_wake_worker(wqe, acct); } -static void io_worker_start(struct io_worker *worker) -{ - worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); - io_wqe_inc_running(worker); -} - /* * Worker will start processing some work. Move it to the busy list, if * it's currently on the freelist @@ -319,10 +322,6 @@ static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker) worker->flags |= IO_WORKER_F_FREE; hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); } - if (worker->saved_creds) { - revert_creds(worker->saved_creds); - worker->cur_creds = worker->saved_creds = NULL; - } } static inline unsigned int io_get_work_hash(struct io_wq_work *work) @@ -397,18 +396,6 @@ static void io_flush_signals(void) } } -static void io_wq_switch_creds(struct io_worker *worker, - struct io_wq_work *work) -{ - const struct cred *old_creds = override_creds(work->creds); - - worker->cur_creds = work->creds; - if (worker->saved_creds) - put_cred(old_creds); /* creds set by previous switch */ - else - worker->saved_creds = old_creds; -} - static void io_assign_current_work(struct io_worker *worker, struct io_wq_work *work) { @@ -458,8 +445,6 @@ get_next: unsigned int hash = io_get_work_hash(work); next_hashed = wq_next_work(work); - if (work->creds && worker->cur_creds != work->creds) - io_wq_switch_creds(worker, work); wq->do_work(work); io_assign_current_work(worker, NULL); @@ -495,8 +480,13 @@ static int io_wqe_worker(void *data) struct io_worker *worker = data; struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; + char buf[TASK_COMM_LEN]; + + worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); + io_wqe_inc_running(worker); - io_worker_start(worker); + sprintf(buf, "iou-wrk-%d", wq->task_pid); + set_task_comm(current, buf); while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { set_current_state(TASK_INTERRUPTIBLE); @@ -571,67 +561,11 @@ void io_wq_worker_sleeping(struct task_struct *tsk) raw_spin_unlock_irq(&worker->wqe->lock); } -static int task_thread(void *data, int index) -{ - struct io_worker *worker = data; - struct io_wqe *wqe = worker->wqe; - struct io_wqe_acct *acct = &wqe->acct[index]; - struct io_wq *wq = wqe->wq; - char buf[TASK_COMM_LEN]; - - sprintf(buf, "iou-wrk-%d", wq->task_pid); - set_task_comm(current, buf); - - current->pf_io_worker = worker; - worker->task = current; - - set_cpus_allowed_ptr(current, cpumask_of_node(wqe->node)); - current->flags |= PF_NO_SETAFFINITY; - - raw_spin_lock_irq(&wqe->lock); - hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); - list_add_tail_rcu(&worker->all_list, &wqe->all_list); - worker->flags |= IO_WORKER_F_FREE; - if (index == IO_WQ_ACCT_BOUND) - worker->flags |= IO_WORKER_F_BOUND; - if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND)) - worker->flags |= IO_WORKER_F_FIXED; - acct->nr_workers++; - raw_spin_unlock_irq(&wqe->lock); - - io_wqe_worker(data); - do_exit(0); -} - -static int task_thread_bound(void *data) -{ - return task_thread(data, IO_WQ_ACCT_BOUND); -} - -static int task_thread_unbound(void *data) -{ - return task_thread(data, IO_WQ_ACCT_UNBOUND); -} - -pid_t io_wq_fork_thread(int (*fn)(void *), void *arg) -{ - unsigned long flags = CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD| - CLONE_IO|SIGCHLD; - struct kernel_clone_args args = { - .flags = ((lower_32_bits(flags) | CLONE_VM | - CLONE_UNTRACED) & ~CSIGNAL), - .exit_signal = (lower_32_bits(flags) & CSIGNAL), - .stack = (unsigned long)fn, - .stack_size = (unsigned long)arg, - }; - - return kernel_clone(&args); -} - static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) { + struct io_wqe_acct *acct = &wqe->acct[index]; struct io_worker *worker; - pid_t pid; + struct task_struct *tsk; __set_current_state(TASK_RUNNING); @@ -645,17 +579,32 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) spin_lock_init(&worker->lock); init_completion(&worker->ref_done); - refcount_inc(&wq->refs); + atomic_inc(&wq->worker_refs); - if (index == IO_WQ_ACCT_BOUND) - pid = io_wq_fork_thread(task_thread_bound, worker); - else - pid = io_wq_fork_thread(task_thread_unbound, worker); - if (pid < 0) { - io_wq_put(wq); + tsk = create_io_thread(io_wqe_worker, worker, wqe->node); + if (IS_ERR(tsk)) { + if (atomic_dec_and_test(&wq->worker_refs)) + complete(&wq->worker_done); kfree(worker); return false; } + + tsk->pf_io_worker = worker; + worker->task = tsk; + set_cpus_allowed_ptr(tsk, cpumask_of_node(wqe->node)); + tsk->flags |= PF_NOFREEZE | PF_NO_SETAFFINITY; + + raw_spin_lock_irq(&wqe->lock); + hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); + list_add_tail_rcu(&worker->all_list, &wqe->all_list); + worker->flags |= IO_WORKER_F_FREE; + if (index == IO_WQ_ACCT_BOUND) + worker->flags |= IO_WORKER_F_BOUND; + if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND)) + worker->flags |= IO_WORKER_F_FIXED; + acct->nr_workers++; + raw_spin_unlock_irq(&wqe->lock); + wake_up_new_task(tsk); return true; } @@ -664,6 +613,8 @@ static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index) { struct io_wqe_acct *acct = &wqe->acct[index]; + if (acct->nr_workers && test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) + return false; /* if we have available workers or no work, no need */ if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe)) return false; @@ -697,6 +648,7 @@ static bool io_wq_for_each_worker(struct io_wqe *wqe, static bool io_wq_worker_wake(struct io_worker *worker, void *data) { + set_notify_signal(worker->task); wake_up_process(worker->task); return false; } @@ -725,6 +677,23 @@ static void io_wq_check_workers(struct io_wq *wq) } } +static bool io_wq_work_match_all(struct io_wq_work *work, void *data) +{ + return true; +} + +static void io_wq_cancel_pending(struct io_wq *wq) +{ + struct io_cb_cancel_data match = { + .fn = io_wq_work_match_all, + .cancel_all = true, + }; + int node; + + for_each_node(node) + io_wqe_cancel_pending_work(wq->wqes[node], &match); +} + /* * Manager thread. Tasked with creating new workers, if we need them. */ @@ -732,25 +701,38 @@ static int io_wq_manager(void *data) { struct io_wq *wq = data; char buf[TASK_COMM_LEN]; + int node; sprintf(buf, "iou-mgr-%d", wq->task_pid); set_task_comm(current, buf); - current->flags |= PF_IO_WORKER; - wq->manager = current; - - complete(&wq->done); do { set_current_state(TASK_INTERRUPTIBLE); io_wq_check_workers(wq); schedule_timeout(HZ); + try_to_freeze(); if (fatal_signal_pending(current)) set_bit(IO_WQ_BIT_EXIT, &wq->state); } while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)); io_wq_check_workers(wq); - wq->manager = NULL; - io_wq_put(wq); + + rcu_read_lock(); + for_each_node(node) + io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); + rcu_read_unlock(); + + /* we might not ever have created any workers */ + if (atomic_read(&wq->worker_refs)) + wait_for_completion(&wq->worker_done); + + spin_lock_irq(&wq->hash->wait.lock); + for_each_node(node) + list_del_init(&wq->wqes[node]->wait.entry); + spin_unlock_irq(&wq->hash->wait.lock); + + io_wq_cancel_pending(wq); + complete(&wq->exited); do_exit(0); } @@ -787,23 +769,20 @@ append: static int io_wq_fork_manager(struct io_wq *wq) { - int ret; + struct task_struct *tsk; if (wq->manager) return 0; - clear_bit(IO_WQ_BIT_EXIT, &wq->state); - refcount_inc(&wq->refs); - current->flags |= PF_IO_WORKER; - ret = io_wq_fork_thread(io_wq_manager, wq); - current->flags &= ~PF_IO_WORKER; - if (ret >= 0) { - wait_for_completion(&wq->done); + reinit_completion(&wq->worker_done); + tsk = create_io_thread(io_wq_manager, wq, NUMA_NO_NODE); + if (!IS_ERR(tsk)) { + wq->manager = get_task_struct(tsk); + wake_up_new_task(tsk); return 0; } - io_wq_put(wq); - return ret; + return PTR_ERR(tsk); } static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) @@ -813,7 +792,8 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) unsigned long flags; /* Can only happen if manager creation fails after exec */ - if (unlikely(io_wq_fork_manager(wqe->wq))) { + if (io_wq_fork_manager(wqe->wq) || + test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) { work->flags |= IO_WQ_WORK_CANCEL; wqe->wq->do_work(work); return; @@ -849,14 +829,6 @@ void io_wq_hash_work(struct io_wq_work *work, void *val) work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT)); } -struct io_cb_cancel_data { - work_cancel_fn *fn; - void *data; - int nr_running; - int nr_pending; - bool cancel_all; -}; - static bool io_wq_worker_cancel(struct io_worker *worker, void *data) { struct io_cb_cancel_data *match = data; @@ -1043,16 +1015,18 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) } wq->task_pid = current->pid; - init_completion(&wq->done); + init_completion(&wq->exited); refcount_set(&wq->refs, 1); + init_completion(&wq->worker_done); + atomic_set(&wq->worker_refs, 0); + ret = io_wq_fork_manager(wq); if (!ret) return wq; - io_wq_put(wq); - io_wq_put_hash(data->hash); err: + io_wq_put_hash(data->hash); cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); for_each_node(node) kfree(wq->wqes[node]); @@ -1063,6 +1037,16 @@ err_wq: return ERR_PTR(ret); } +static void io_wq_destroy_manager(struct io_wq *wq) +{ + if (wq->manager) { + wake_up_process(wq->manager); + wait_for_completion(&wq->exited); + put_task_struct(wq->manager); + wq->manager = NULL; + } +} + static void io_wq_destroy(struct io_wq *wq) { int node; @@ -1070,26 +1054,16 @@ static void io_wq_destroy(struct io_wq *wq) cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); set_bit(IO_WQ_BIT_EXIT, &wq->state); - if (wq->manager) - wake_up_process(wq->manager); - - rcu_read_lock(); - for_each_node(node) - io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); - rcu_read_unlock(); + io_wq_destroy_manager(wq); - spin_lock_irq(&wq->hash->wait.lock); for_each_node(node) { struct io_wqe *wqe = wq->wqes[node]; - - list_del_init(&wqe->wait.entry); + WARN_ON_ONCE(!wq_list_empty(&wqe->work_list)); kfree(wqe); } - spin_unlock_irq(&wq->hash->wait.lock); io_wq_put_hash(wq->hash); kfree(wq->wqes); kfree(wq); - } void io_wq_put(struct io_wq *wq) @@ -1098,6 +1072,13 @@ void io_wq_put(struct io_wq *wq) io_wq_destroy(wq); } +void io_wq_put_and_exit(struct io_wq *wq) +{ + set_bit(IO_WQ_BIT_EXIT, &wq->state); + io_wq_destroy_manager(wq); + io_wq_put(wq); +} + static bool io_wq_worker_affinity(struct io_worker *worker, void *data) { struct task_struct *task = worker->task; |