diff options
author | Paul E. McKenney | 2021-11-10 15:56:40 -0800 |
---|---|---|
committer | Paul E. McKenney | 2021-12-09 10:52:00 -0800 |
commit | d363f833c6d88331ff013ff0970a96caa8b84653 (patch) | |
tree | 881043075fd4d35736d7124a10ee0d8d1c46b732 /kernel/rcu | |
parent | 57881863ad15fbccbfa637b5e4b67cd3a4520643 (diff) |
rcu-tasks: Use workqueues for multiple rcu_tasks_invoke_cbs() invocations
If there is a flood of callbacks, it is necessary to put multiple
CPUs to work invoking those callbacks. This commit therefore uses a
workqueue-flooding approach to parallelize RCU Tasks callback execution.
Reported-by: Martin Lau <kafai@fb.com>
Cc: Neeraj Upadhyay <neeraj.iitr10@gmail.com>
Signed-off-by: Paul E. McKenney <paulmck@kernel.org>
Diffstat (limited to 'kernel/rcu')
-rw-r--r-- | kernel/rcu/tasks.h | 75 |
1 files changed, 52 insertions, 23 deletions
diff --git a/kernel/rcu/tasks.h b/kernel/rcu/tasks.h index 6b3406a47720..97bd7a667e94 100644 --- a/kernel/rcu/tasks.h +++ b/kernel/rcu/tasks.h @@ -24,10 +24,14 @@ typedef void (*postgp_func_t)(struct rcu_tasks *rtp); * struct rcu_tasks_percpu - Per-CPU component of definition for a Tasks-RCU-like mechanism. * @cblist: Callback list. * @lock: Lock protecting per-CPU callback list. + * @rtp_work: Work queue for invoking callbacks. */ struct rcu_tasks_percpu { struct rcu_segcblist cblist; raw_spinlock_t __private lock; + struct work_struct rtp_work; + int cpu; + struct rcu_tasks *rtpp; }; /** @@ -146,6 +150,8 @@ static const char * const rcu_tasks_gp_state_names[] = { // // Generic code. +static void rcu_tasks_invoke_cbs_wq(struct work_struct *wp); + /* Record grace-period phase and time. */ static void set_tasks_gp_state(struct rcu_tasks *rtp, int newstate) { @@ -185,6 +191,9 @@ static void cblist_init_generic(struct rcu_tasks *rtp) raw_spin_lock_rcu_node(rtpcp); // irqs already disabled. if (rcu_segcblist_empty(&rtpcp->cblist)) rcu_segcblist_init(&rtpcp->cblist); + INIT_WORK(&rtpcp->rtp_work, rcu_tasks_invoke_cbs_wq); + rtpcp->cpu = cpu; + rtpcp->rtpp = rtp; raw_spin_unlock_rcu_node(rtpcp); // irqs remain disabled. } raw_spin_unlock_irqrestore(&rtp->cbs_gbl_lock, flags); @@ -256,35 +265,55 @@ static int rcu_tasks_need_gpcb(struct rcu_tasks *rtp) } // Advance callbacks and invoke any that are ready. -static void rcu_tasks_invoke_cbs(struct rcu_tasks *rtp) +static void rcu_tasks_invoke_cbs(struct rcu_tasks *rtp, struct rcu_tasks_percpu *rtpcp) { int cpu; + int cpunext; unsigned long flags; int len; - struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl); struct rcu_head *rhp; - - for (cpu = 0; cpu < rtp->percpu_enqueue_lim; cpu++) { - struct rcu_tasks_percpu *rtpcp = per_cpu_ptr(rtp->rtpcpu, cpu); - - if (rcu_segcblist_empty(&rtpcp->cblist)) - continue; - raw_spin_lock_irqsave_rcu_node(rtpcp, flags); - rcu_segcblist_advance(&rtpcp->cblist, rcu_seq_current(&rtp->tasks_gp_seq)); - rcu_segcblist_extract_done_cbs(&rtpcp->cblist, &rcl); - raw_spin_unlock_irqrestore_rcu_node(rtpcp, flags); - len = rcl.len; - for (rhp = rcu_cblist_dequeue(&rcl); rhp; rhp = rcu_cblist_dequeue(&rcl)) { - local_bh_disable(); - rhp->func(rhp); - local_bh_enable(); - cond_resched(); + struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl); + struct rcu_tasks_percpu *rtpcp_next; + + cpu = rtpcp->cpu; + cpunext = cpu * 2 + 1; + if (cpunext < rtp->percpu_enqueue_lim) { + rtpcp_next = per_cpu_ptr(rtp->rtpcpu, cpunext); + queue_work_on(cpunext, system_wq, &rtpcp_next->rtp_work); + cpunext++; + if (cpunext < rtp->percpu_enqueue_lim) { + rtpcp_next = per_cpu_ptr(rtp->rtpcpu, cpunext); + queue_work_on(cpunext, system_wq, &rtpcp_next->rtp_work); } - raw_spin_lock_irqsave_rcu_node(rtpcp, flags); - rcu_segcblist_add_len(&rtpcp->cblist, -len); - (void)rcu_segcblist_accelerate(&rtpcp->cblist, rcu_seq_snap(&rtp->tasks_gp_seq)); - raw_spin_unlock_irqrestore_rcu_node(rtpcp, flags); } + + if (rcu_segcblist_empty(&rtpcp->cblist)) + return; + raw_spin_lock_irqsave_rcu_node(rtpcp, flags); + rcu_segcblist_advance(&rtpcp->cblist, rcu_seq_current(&rtp->tasks_gp_seq)); + rcu_segcblist_extract_done_cbs(&rtpcp->cblist, &rcl); + raw_spin_unlock_irqrestore_rcu_node(rtpcp, flags); + len = rcl.len; + for (rhp = rcu_cblist_dequeue(&rcl); rhp; rhp = rcu_cblist_dequeue(&rcl)) { + local_bh_disable(); + rhp->func(rhp); + local_bh_enable(); + cond_resched(); + } + raw_spin_lock_irqsave_rcu_node(rtpcp, flags); + rcu_segcblist_add_len(&rtpcp->cblist, -len); + (void)rcu_segcblist_accelerate(&rtpcp->cblist, rcu_seq_snap(&rtp->tasks_gp_seq)); + raw_spin_unlock_irqrestore_rcu_node(rtpcp, flags); +} + +// Workqueue flood to advance callbacks and invoke any that are ready. +static void rcu_tasks_invoke_cbs_wq(struct work_struct *wp) +{ + struct rcu_tasks *rtp; + struct rcu_tasks_percpu *rtpcp = container_of(wp, struct rcu_tasks_percpu, rtp_work); + + rtp = rtpcp->rtpp; + rcu_tasks_invoke_cbs(rtp, rtpcp); } /* RCU-tasks kthread that detects grace periods and invokes callbacks. */ @@ -320,7 +349,7 @@ static int __noreturn rcu_tasks_kthread(void *arg) /* Invoke callbacks. */ set_tasks_gp_state(rtp, RTGS_INVOKE_CBS); - rcu_tasks_invoke_cbs(rtp); + rcu_tasks_invoke_cbs(rtp, per_cpu_ptr(rtp->rtpcpu, 0)); /* Paranoid sleep to keep this from entering a tight loop */ schedule_timeout_idle(rtp->gp_sleep); |