diff options
author | Paul E. McKenney | 2019-05-15 09:56:40 -0700 |
---|---|---|
committer | Paul E. McKenney | 2019-08-13 14:35:49 -0700 |
commit | 5d6742b37727e111f4755155e59c5319cf5caa7b (patch) | |
tree | 88573ec286c5a5aa4d0366972fa265ff21ad34c4 /kernel/rcu/tree_plugin.h | |
parent | e83e73f5b0f8de6a8978ba64185e80fdf48a2a63 (diff) |
rcu/nocb: Use rcu_segcblist for no-CBs CPUs
Currently the RCU callbacks for no-CBs CPUs are queued on a series of
ad-hoc linked lists, which means that these callbacks cannot benefit
from "drive-by" grace periods, thus suffering needless delays prior
to invocation. In addition, the no-CBs grace-period kthreads first
wait for callbacks to appear and later wait for a new grace period,
which means that callbacks appearing during a grace-period wait can
be delayed. These delays increase memory footprint, and could even
result in an out-of-memory condition.
This commit therefore enqueues RCU callbacks from no-CBs CPUs on the
rcu_segcblist structure that is already used by non-no-CBs CPUs. It also
restructures the no-CBs grace-period kthread to be checking for incoming
callbacks while waiting for grace periods. Also, instead of waiting
for a new grace period, it waits for the closest grace period that will
cause some of the callbacks to be safe to invoke. All of these changes
reduce callback latency and thus the number of outstanding callbacks,
in turn reducing the probability of an out-of-memory condition.
Signed-off-by: Paul E. McKenney <paulmck@linux.ibm.com>
Diffstat (limited to 'kernel/rcu/tree_plugin.h')
-rw-r--r-- | kernel/rcu/tree_plugin.h | 510 |
1 files changed, 186 insertions, 324 deletions
diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h index 2d37fd3fa0d4..feffc46cccb0 100644 --- a/kernel/rcu/tree_plugin.h +++ b/kernel/rcu/tree_plugin.h @@ -1495,6 +1495,45 @@ static int __init parse_rcu_nocb_poll(char *arg) early_param("rcu_nocb_poll", parse_rcu_nocb_poll); /* + * Acquire the specified rcu_data structure's ->nocb_lock, but only + * if it corresponds to a no-CBs CPU. + */ +static void rcu_nocb_lock(struct rcu_data *rdp) +{ + if (rcu_segcblist_is_offloaded(&rdp->cblist)) { + lockdep_assert_irqs_disabled(); + raw_spin_lock(&rdp->nocb_lock); + } +} + +/* + * Release the specified rcu_data structure's ->nocb_lock, but only + * if it corresponds to a no-CBs CPU. + */ +static void rcu_nocb_unlock(struct rcu_data *rdp) +{ + if (rcu_segcblist_is_offloaded(&rdp->cblist)) { + lockdep_assert_irqs_disabled(); + raw_spin_unlock(&rdp->nocb_lock); + } +} + +/* + * Release the specified rcu_data structure's ->nocb_lock and restore + * interrupts, but only if it corresponds to a no-CBs CPU. + */ +static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp, + unsigned long flags) +{ + if (rcu_segcblist_is_offloaded(&rdp->cblist)) { + lockdep_assert_irqs_disabled(); + raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); + } else { + local_irq_restore(flags); + } +} + +/* * Wake up any no-CBs CPUs' kthreads that were waiting on the just-ended * grace period. */ @@ -1526,7 +1565,7 @@ bool rcu_is_nocb_cpu(int cpu) * Kick the GP kthread for this NOCB group. Caller holds ->nocb_lock * and this function releases it. */ -static void __wake_nocb_gp(struct rcu_data *rdp, bool force, +static void wake_nocb_gp(struct rcu_data *rdp, bool force, unsigned long flags) __releases(rdp->nocb_lock) { @@ -1537,326 +1576,168 @@ static void __wake_nocb_gp(struct rcu_data *rdp, bool force, raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); return; } - if (rdp_gp->nocb_gp_sleep || force) { - /* Prior smp_mb__after_atomic() orders against prior enqueue. */ - WRITE_ONCE(rdp_gp->nocb_gp_sleep, false); + if (READ_ONCE(rdp_gp->nocb_gp_sleep) || force) { del_timer(&rdp->nocb_timer); raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); - smp_mb(); /* ->nocb_gp_sleep before swake_up_one(). */ - swake_up_one(&rdp_gp->nocb_gp_wq); + smp_mb(); /* enqueue before ->nocb_gp_sleep. */ + raw_spin_lock_irqsave(&rdp_gp->nocb_lock, flags); + WRITE_ONCE(rdp_gp->nocb_gp_sleep, false); + raw_spin_unlock_irqrestore(&rdp_gp->nocb_lock, flags); + wake_up_process(rdp_gp->nocb_gp_kthread); } else { raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); } } /* - * Kick the GP kthread for this NOCB group, but caller has not - * acquired locks. - */ -static void wake_nocb_gp(struct rcu_data *rdp, bool force) -{ - unsigned long flags; - - raw_spin_lock_irqsave(&rdp->nocb_lock, flags); - __wake_nocb_gp(rdp, force, flags); -} - -/* * Arrange to wake the GP kthread for this NOCB group at some future * time when it is safe to do so. */ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype, const char *reason) { - unsigned long flags; - - raw_spin_lock_irqsave(&rdp->nocb_lock, flags); if (rdp->nocb_defer_wakeup == RCU_NOCB_WAKE_NOT) mod_timer(&rdp->nocb_timer, jiffies + 1); WRITE_ONCE(rdp->nocb_defer_wakeup, waketype); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, reason); - raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); -} - -/* Does rcu_barrier need to queue an RCU callback on the specified CPU? */ -static bool rcu_nocb_cpu_needs_barrier(int cpu) -{ - struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu); - unsigned long ret; -#ifdef CONFIG_PROVE_RCU - struct rcu_head *rhp; -#endif /* #ifdef CONFIG_PROVE_RCU */ - - /* - * Check count of all no-CBs callbacks awaiting invocation. - * There needs to be a barrier before this function is called, - * but associated with a prior determination that no more - * callbacks would be posted. In the worst case, the first - * barrier in rcu_barrier() suffices (but the caller cannot - * necessarily rely on this, not a substitute for the caller - * getting the concurrency design right!). There must also be a - * barrier between the following load and posting of a callback - * (if a callback is in fact needed). This is associated with an - * atomic_inc() in the caller. - */ - ret = rcu_get_n_cbs_nocb_cpu(rdp); - -#ifdef CONFIG_PROVE_RCU - rhp = READ_ONCE(rdp->nocb_head); - if (!rhp) - rhp = READ_ONCE(rdp->nocb_gp_head); - if (!rhp) - rhp = READ_ONCE(rdp->nocb_cb_head); - - /* Having no rcuo kthread but CBs after scheduler starts is bad! */ - if (!READ_ONCE(rdp->nocb_cb_kthread) && rhp && - rcu_scheduler_fully_active) { - /* RCU callback enqueued before CPU first came online??? */ - pr_err("RCU: Never-onlined no-CBs CPU %d has CB %p\n", - cpu, rhp->func); - WARN_ON_ONCE(1); - } -#endif /* #ifdef CONFIG_PROVE_RCU */ - - return !!ret; } /* - * Enqueue the specified string of rcu_head structures onto the specified - * CPU's no-CBs lists. The CPU is specified by rdp, the head of the - * string by rhp, and the tail of the string by rhtp. The non-lazy/lazy - * counts are supplied by rhcount and rhcount_lazy. + * Awaken the no-CBs grace-period kthead if needed, either due to it + * legitimately being asleep or due to overload conditions. * * If warranted, also wake up the kthread servicing this CPUs queues. */ -static void __call_rcu_nocb_enqueue(struct rcu_data *rdp, - struct rcu_head *rhp, - struct rcu_head **rhtp, - int rhcount, int rhcount_lazy, - unsigned long flags) +static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone, + unsigned long flags) + __releases(rdp->nocb_lock) { int len; - struct rcu_head **old_rhpp; struct task_struct *t; - /* Enqueue the callback on the nocb list and update counts. */ - atomic_long_add(rhcount, &rdp->nocb_q_count); - /* rcu_barrier() relies on ->nocb_q_count add before xchg. */ - old_rhpp = xchg(&rdp->nocb_tail, rhtp); - WRITE_ONCE(*old_rhpp, rhp); - atomic_long_add(rhcount_lazy, &rdp->nocb_q_count_lazy); - smp_mb__after_atomic(); /* Store *old_rhpp before _wake test. */ - - /* If we are not being polled and there is a kthread, awaken it ... */ + // If we are being polled or there is no kthread, just leave. t = READ_ONCE(rdp->nocb_gp_kthread); if (rcu_nocb_poll || !t) { trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNotPoll")); + rcu_nocb_unlock_irqrestore(rdp, flags); return; } - len = rcu_get_n_cbs_nocb_cpu(rdp); - if (old_rhpp == &rdp->nocb_head) { + // Need to actually to a wakeup. + len = rcu_segcblist_n_cbs(&rdp->cblist); + if (was_alldone) { if (!irqs_disabled_flags(flags)) { /* ... if queue was empty ... */ - wake_nocb_gp(rdp, false); + wake_nocb_gp(rdp, false, flags); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeEmpty")); } else { wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE, TPS("WakeEmptyIsDeferred")); + rcu_nocb_unlock_irqrestore(rdp, flags); } rdp->qlen_last_fqs_check = 0; } else if (len > rdp->qlen_last_fqs_check + qhimark) { /* ... or if many callbacks queued. */ if (!irqs_disabled_flags(flags)) { - wake_nocb_gp(rdp, true); + wake_nocb_gp(rdp, true, flags); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeOvf")); } else { wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_FORCE, TPS("WakeOvfIsDeferred")); + rcu_nocb_unlock_irqrestore(rdp, flags); } rdp->qlen_last_fqs_check = LONG_MAX / 2; } else { trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot")); + rcu_nocb_unlock_irqrestore(rdp, flags); } + if (!irqs_disabled_flags(flags)) + lockdep_assert_irqs_enabled(); return; } /* - * This is a helper for __call_rcu(), which invokes this when the normal - * callback queue is inoperable. If this is not a no-CBs CPU, this - * function returns failure back to __call_rcu(), which can complain - * appropriately. - * - * Otherwise, this function queues the callback where the corresponding - * "rcuo" kthread can find it. - */ -static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp, - bool lazy, unsigned long flags) -{ - - if (!rcu_segcblist_is_offloaded(&rdp->cblist)) - return false; - __call_rcu_nocb_enqueue(rdp, rhp, &rhp->next, 1, lazy, flags); - if (__is_kfree_rcu_offset((unsigned long)rhp->func)) - trace_rcu_kfree_callback(rcu_state.name, rhp, - (unsigned long)rhp->func, - -atomic_long_read(&rdp->nocb_q_count_lazy), - -rcu_get_n_cbs_nocb_cpu(rdp)); - else - trace_rcu_callback(rcu_state.name, rhp, - -atomic_long_read(&rdp->nocb_q_count_lazy), - -rcu_get_n_cbs_nocb_cpu(rdp)); - - return true; -} - -/* - * Adopt orphaned callbacks on a no-CBs CPU, or return 0 if this is - * not a no-CBs CPU. - */ -static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_data *my_rdp, - struct rcu_data *rdp, - unsigned long flags) -{ - lockdep_assert_irqs_disabled(); - if (!rcu_segcblist_is_offloaded(&my_rdp->cblist)) - return false; /* Not NOCBs CPU, caller must migrate CBs. */ - __call_rcu_nocb_enqueue(my_rdp, rcu_segcblist_head(&rdp->cblist), - rcu_segcblist_tail(&rdp->cblist), - rcu_segcblist_n_cbs(&rdp->cblist), - rcu_segcblist_n_lazy_cbs(&rdp->cblist), flags); - rcu_segcblist_init(&rdp->cblist); - rcu_segcblist_disable(&rdp->cblist); - return true; -} - -/* - * If necessary, kick off a new grace period, and either way wait - * for a subsequent grace period to complete. - */ -static void rcu_nocb_wait_gp(struct rcu_data *rdp) -{ - unsigned long c; - bool d; - unsigned long flags; - bool needwake; - struct rcu_node *rnp = rdp->mynode; - - local_irq_save(flags); - c = rcu_seq_snap(&rcu_state.gp_seq); - if (!rdp->gpwrap && ULONG_CMP_GE(rdp->gp_seq_needed, c)) { - local_irq_restore(flags); - } else { - raw_spin_lock_rcu_node(rnp); /* irqs already disabled. */ - needwake = rcu_start_this_gp(rnp, rdp, c); - raw_spin_unlock_irqrestore_rcu_node(rnp, flags); - if (needwake) - rcu_gp_kthread_wake(); - } - - /* - * Wait for the grace period. Do so interruptibly to avoid messing - * up the load average. - */ - trace_rcu_this_gp(rnp, rdp, c, TPS("StartWait")); - for (;;) { - swait_event_interruptible_exclusive( - rnp->nocb_gp_wq[rcu_seq_ctr(c) & 0x1], - (d = rcu_seq_done(&rnp->gp_seq, c))); - if (likely(d)) - break; - WARN_ON(signal_pending(current)); - trace_rcu_this_gp(rnp, rdp, c, TPS("ResumeWait")); - } - trace_rcu_this_gp(rnp, rdp, c, TPS("EndWait")); - smp_mb(); /* Ensure that CB invocation happens after GP end. */ -} - -/* - * No-CBs GP kthreads come here to wait for additional callbacks to show up. - * This function does not return until callbacks appear. + * No-CBs GP kthreads come here to wait for additional callbacks to show up + * or for grace periods to end. */ static void nocb_gp_wait(struct rcu_data *my_rdp) { - bool firsttime = true; + int __maybe_unused cpu = my_rdp->cpu; + unsigned long cur_gp_seq; unsigned long flags; bool gotcbs; + bool needwait_gp = false; + bool needwake; + bool needwake_gp; struct rcu_data *rdp; - struct rcu_head **tail; - - /* Wait for callbacks to appear. */ - if (!rcu_nocb_poll) { - trace_rcu_nocb_wake(rcu_state.name, my_rdp->cpu, TPS("Sleep")); - swait_event_interruptible_exclusive(my_rdp->nocb_gp_wq, - !READ_ONCE(my_rdp->nocb_gp_sleep)); - raw_spin_lock_irqsave(&my_rdp->nocb_lock, flags); - my_rdp->nocb_gp_sleep = true; - WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT); - del_timer(&my_rdp->nocb_timer); - raw_spin_unlock_irqrestore(&my_rdp->nocb_lock, flags); - } else if (firsttime) { - firsttime = false; /* Don't drown trace log with "Poll"! */ - trace_rcu_nocb_wake(rcu_state.name, my_rdp->cpu, TPS("Poll")); - } + struct rcu_node *rnp; + unsigned long wait_gp_seq; /* - * Each pass through the following loop checks for CBs. - * We are our own first CB kthread. Any CBs found are moved to - * nocb_gp_head, where they await a grace period. + * Each pass through the following loop checks for CBs and for the + * nearest grace period (if any) to wait for next. The CB kthreads + * and the global grace-period kthread are awakened if needed. */ - gotcbs = false; - smp_mb(); /* wakeup and _sleep before ->nocb_head reads. */ for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_cb_rdp) { - rdp->nocb_gp_head = READ_ONCE(rdp->nocb_head); - if (!rdp->nocb_gp_head) - continue; /* No CBs here, try next. */ - - /* Move callbacks to wait-for-GP list, which is empty. */ - WRITE_ONCE(rdp->nocb_head, NULL); - rdp->nocb_gp_tail = xchg(&rdp->nocb_tail, &rdp->nocb_head); - gotcbs = true; - } - - /* No callbacks? Sleep a bit if polling, and go retry. */ - if (unlikely(!gotcbs)) { - WARN_ON(signal_pending(current)); - if (rcu_nocb_poll) { - schedule_timeout_interruptible(1); - } else { - trace_rcu_nocb_wake(rcu_state.name, my_rdp->cpu, - TPS("WokeEmpty")); + if (rcu_segcblist_empty(&rdp->cblist)) + continue; /* No callbacks here, try next. */ + rnp = rdp->mynode; + raw_spin_lock_irqsave(&rdp->nocb_lock, flags); + WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT); + del_timer(&my_rdp->nocb_timer); + raw_spin_lock_rcu_node(rnp); /* irqs already disabled. */ + needwake_gp = rcu_advance_cbs(rnp, rdp); + raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */ + // Need to wait on some grace period? + if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq)) { + if (!needwait_gp || + ULONG_CMP_LT(cur_gp_seq, wait_gp_seq)) + wait_gp_seq = cur_gp_seq; + needwait_gp = true; } - return; - } - - /* Wait for one grace period. */ - rcu_nocb_wait_gp(my_rdp); - - /* Each pass through this loop wakes a CB kthread, if needed. */ - for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_cb_rdp) { - if (!rcu_nocb_poll && - READ_ONCE(rdp->nocb_head) && - READ_ONCE(my_rdp->nocb_gp_sleep)) { - raw_spin_lock_irqsave(&my_rdp->nocb_lock, flags); - my_rdp->nocb_gp_sleep = false;/* No need to sleep.*/ - raw_spin_unlock_irqrestore(&my_rdp->nocb_lock, flags); + if (rcu_segcblist_ready_cbs(&rdp->cblist)) { + needwake = rdp->nocb_cb_sleep; + WRITE_ONCE(rdp->nocb_cb_sleep, false); + smp_mb(); /* CB invocation -after- GP end. */ + } else { + needwake = false; } - if (!rdp->nocb_gp_head) - continue; /* No CBs, so no need to wake kthread. */ - - /* Append callbacks to CB kthread's "done" list. */ - raw_spin_lock_irqsave(&rdp->nocb_lock, flags); - tail = rdp->nocb_cb_tail; - rdp->nocb_cb_tail = rdp->nocb_gp_tail; - *tail = rdp->nocb_gp_head; raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); - if (tail == &rdp->nocb_cb_head) { - /* List was empty, so wake up the kthread. */ + if (needwake) { swake_up_one(&rdp->nocb_cb_wq); + gotcbs = true; } + if (needwake_gp) + rcu_gp_kthread_wake(); + } + + if (rcu_nocb_poll) { + /* Polling, so trace if first poll in the series. */ + if (gotcbs) + trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Poll")); + schedule_timeout_interruptible(1); + } else if (!needwait_gp) { + /* Wait for callbacks to appear. */ + trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Sleep")); + swait_event_interruptible_exclusive(my_rdp->nocb_gp_wq, + !READ_ONCE(my_rdp->nocb_gp_sleep)); + } else { + rnp = my_rdp->mynode; + trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("StartWait")); + swait_event_interruptible_exclusive( + rnp->nocb_gp_wq[rcu_seq_ctr(wait_gp_seq) & 0x1], + rcu_seq_done(&rnp->gp_seq, wait_gp_seq) || + !READ_ONCE(my_rdp->nocb_gp_sleep)); + trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("EndWait")); + } + if (!rcu_nocb_poll) { + raw_spin_lock_irqsave(&my_rdp->nocb_lock, flags); + WRITE_ONCE(my_rdp->nocb_gp_sleep, true); + raw_spin_unlock_irqrestore(&my_rdp->nocb_lock, flags); } + WARN_ON(signal_pending(current)); } /* @@ -1871,92 +1752,69 @@ static int rcu_nocb_gp_kthread(void *arg) { struct rcu_data *rdp = arg; - for (;;) + for (;;) { nocb_gp_wait(rdp); + cond_resched_tasks_rcu_qs(); + } return 0; } /* - * No-CBs CB kthreads come here to wait for additional callbacks to show up. - * This function returns true ("keep waiting") until callbacks appear and - * then false ("stop waiting") when callbacks finally do appear. + * Invoke any ready callbacks from the corresponding no-CBs CPU, + * then, if there are no more, wait for more to appear. */ -static bool nocb_cb_wait(struct rcu_data *rdp) +static void nocb_cb_wait(struct rcu_data *rdp) { + unsigned long flags; + bool needwake_gp = false; + struct rcu_node *rnp = rdp->mynode; + + local_irq_save(flags); + rcu_momentary_dyntick_idle(); + local_irq_restore(flags); + local_bh_disable(); + rcu_do_batch(rdp); + local_bh_enable(); + lockdep_assert_irqs_enabled(); + raw_spin_lock_irqsave(&rdp->nocb_lock, flags); + raw_spin_lock_rcu_node(rnp); /* irqs already disabled. */ + needwake_gp = rcu_advance_cbs(rdp->mynode, rdp); + raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */ + if (rcu_segcblist_ready_cbs(&rdp->cblist)) { + raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); + if (needwake_gp) + rcu_gp_kthread_wake(); + return; + } + trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("CBSleep")); + WRITE_ONCE(rdp->nocb_cb_sleep, true); + raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); + if (needwake_gp) + rcu_gp_kthread_wake(); swait_event_interruptible_exclusive(rdp->nocb_cb_wq, - READ_ONCE(rdp->nocb_cb_head)); - if (smp_load_acquire(&rdp->nocb_cb_head)) { /* VVV */ - /* ^^^ Ensure CB invocation follows _head test. */ - return false; + !READ_ONCE(rdp->nocb_cb_sleep)); + if (!smp_load_acquire(&rdp->nocb_cb_sleep)) { /* VVV */ + /* ^^^ Ensure CB invocation follows _sleep test. */ + return; } WARN_ON(signal_pending(current)); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty")); - return true; } /* - * Per-rcu_data kthread, but only for no-CBs CPUs. Each kthread invokes - * callbacks queued by the corresponding no-CBs CPU, however, there is an - * optional GP-CB relationship so that the grace-period kthreads don't - * have to do quite so many wakeups (as in they only need to wake the - * no-CBs GP kthreads, not the CB kthreads). + * Per-rcu_data kthread, but only for no-CBs CPUs. Repeatedly invoke + * nocb_cb_wait() to do the dirty work. */ static int rcu_nocb_cb_kthread(void *arg) { - int c, cl; - unsigned long flags; - struct rcu_head *list; - struct rcu_head *next; - struct rcu_head **tail; struct rcu_data *rdp = arg; - /* Each pass through this loop invokes one batch of callbacks */ + // Each pass through this loop does one callback batch, and, + // if there are no more ready callbacks, waits for them. for (;;) { - /* Wait for callbacks. */ - while (nocb_cb_wait(rdp)) - continue; - - /* Pull the ready-to-invoke callbacks onto local list. */ - raw_spin_lock_irqsave(&rdp->nocb_lock, flags); - list = rdp->nocb_cb_head; - rdp->nocb_cb_head = NULL; - tail = rdp->nocb_cb_tail; - rdp->nocb_cb_tail = &rdp->nocb_cb_head; - raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); - if (WARN_ON_ONCE(!list)) - continue; - trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeNonEmpty")); - - /* Each pass through the following loop invokes a callback. */ - trace_rcu_batch_start(rcu_state.name, - atomic_long_read(&rdp->nocb_q_count_lazy), - rcu_get_n_cbs_nocb_cpu(rdp), -1); - c = cl = 0; - while (list) { - next = list->next; - /* Wait for enqueuing to complete, if needed. */ - while (next == NULL && &list->next != tail) { - trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, - TPS("WaitQueue")); - schedule_timeout_interruptible(1); - trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, - TPS("WokeQueue")); - next = list->next; - } - debug_rcu_head_unqueue(list); - local_bh_disable(); - if (__rcu_reclaim(rcu_state.name, list)) - cl++; - c++; - local_bh_enable(); - cond_resched_tasks_rcu_qs(); - list = next; - } - trace_rcu_batch_end(rcu_state.name, c, !!list, 0, 0, 1); - smp_mb__before_atomic(); /* _add after CB invocation. */ - atomic_long_add(-c, &rdp->nocb_q_count); - atomic_long_add(-cl, &rdp->nocb_q_count_lazy); + nocb_cb_wait(rdp); + cond_resched_tasks_rcu_qs(); } return 0; } @@ -1980,7 +1838,7 @@ static void do_nocb_deferred_wakeup_common(struct rcu_data *rdp) } ndw = READ_ONCE(rdp->nocb_defer_wakeup); WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT); - __wake_nocb_gp(rdp, ndw == RCU_NOCB_WAKE_FORCE, flags); + wake_nocb_gp(rdp, ndw == RCU_NOCB_WAKE_FORCE, flags); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DeferredWake")); } @@ -2194,10 +2052,21 @@ static unsigned long rcu_get_n_cbs_nocb_cpu(struct rcu_data *rdp) #else /* #ifdef CONFIG_RCU_NOCB_CPU */ -static bool rcu_nocb_cpu_needs_barrier(int cpu) +/* No ->nocb_lock to acquire. */ +static void rcu_nocb_lock(struct rcu_data *rdp) { - WARN_ON_ONCE(1); /* Should be dead code. */ - return false; +} + +/* No ->nocb_lock to release. */ +static void rcu_nocb_unlock(struct rcu_data *rdp) +{ +} + +/* No ->nocb_lock to release. */ +static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp, + unsigned long flags) +{ + local_irq_restore(flags); } static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq) @@ -2213,17 +2082,10 @@ static void rcu_init_one_nocb(struct rcu_node *rnp) { } -static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp, - bool lazy, unsigned long flags) +static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty, + unsigned long flags) { - return false; -} - -static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_data *my_rdp, - struct rcu_data *rdp, - unsigned long flags) -{ - return false; + WARN_ON_ONCE(1); /* Should be dead code! */ } static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp) |