rcu: add call_rcu

Asynchronous callbacks provided by call_rcu are particularly important
for QEMU, because the BQL makes it hard to use synchronize_rcu.

In addition, the current RCU implementation is not particularly friendly
to multiple concurrent synchronize_rcu callers, making call_rcu even
more important.

Reviewed-by: Fam Zheng <famz@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
This commit is contained in:
Paolo Bonzini 2013-05-13 17:49:24 +02:00
parent d62cb4f2fd
commit 26387f86c9
3 changed files with 247 additions and 4 deletions

View File

@ -82,7 +82,50 @@ The core RCU API is small:
Note that it would be valid for another update to come while
synchronize_rcu is running. Because of this, it is better that
the updater releases any locks it may hold before calling
synchronize_rcu.
synchronize_rcu. If this is not possible (for example, because
the updater is protected by the BQL), you can use call_rcu.
void call_rcu1(struct rcu_head * head,
void (*func)(struct rcu_head *head));
This function invokes func(head) after all pre-existing RCU
read-side critical sections on all threads have completed. This
marks the end of the removal phase, with func taking care
asynchronously of the reclamation phase.
The foo struct needs to have an rcu_head structure added,
perhaps as follows:
struct foo {
struct rcu_head rcu;
int a;
char b;
long c;
};
so that the reclaimer function can fetch the struct foo address
and free it:
call_rcu1(&foo.rcu, foo_reclaim);
void foo_reclaim(struct rcu_head *rp)
{
struct foo *fp = container_of(rp, struct foo, rcu);
g_free(fp);
}
For the common case where the rcu_head member is the first of the
struct, you can use the following macro.
void call_rcu(T *p,
void (*func)(T *p),
field-name);
call_rcu1 is typically used through this macro, in the common case
where the "struct rcu_head" is the first field in the struct. In
the above case, one could have written simply:
call_rcu(foo_reclaim, g_free, rcu);
typeof(*p) atomic_rcu_read(p);
@ -153,6 +196,11 @@ DIFFERENCES WITH LINUX
- atomic_rcu_read and atomic_rcu_set replace rcu_dereference and
rcu_assign_pointer. They take a _pointer_ to the variable being accessed.
- call_rcu is a macro that has an extra argument (the name of the first
field in the struct, which must be a struct rcu_head), and expects the
type of the callback's argument to be the type of the first argument.
call_rcu1 is the same as Linux's call_rcu.
RCU PATTERNS
============
@ -206,7 +254,47 @@ The write side looks simply like this (with appropriate locking):
synchronize_rcu();
free(old);
Note that the same idiom would be possible with reader/writer
If the processing cannot be done purely within the critical section, it
is possible to combine this idiom with a "real" reference count:
rcu_read_lock();
p = atomic_rcu_read(&foo);
foo_ref(p);
rcu_read_unlock();
/* do something with p. */
foo_unref(p);
The write side can be like this:
qemu_mutex_lock(&foo_mutex);
old = foo;
atomic_rcu_set(&foo, new);
qemu_mutex_unlock(&foo_mutex);
synchronize_rcu();
foo_unref(old);
or with call_rcu:
qemu_mutex_lock(&foo_mutex);
old = foo;
atomic_rcu_set(&foo, new);
qemu_mutex_unlock(&foo_mutex);
call_rcu(foo_unref, old, rcu);
In both cases, the write side only performs removal. Reclamation
happens when the last reference to a "foo" object is dropped.
Using synchronize_rcu() is undesirably expensive, because the
last reference may be dropped on the read side. Hence you can
use call_rcu() instead:
foo_unref(struct foo *p) {
if (atomic_fetch_dec(&p->refcount) == 1) {
call_rcu(foo_destroy, p, rcu);
}
}
Note that the same idioms would be possible with reader/writer
locks:
read_lock(&foo_rwlock); write_mutex_lock(&foo_rwlock);
@ -216,13 +304,27 @@ locks:
write_mutex_unlock(&foo_rwlock);
free(p);
------------------------------------------------------------------
read_lock(&foo_rwlock); write_mutex_lock(&foo_rwlock);
p = foo; old = foo;
foo_ref(p); foo = new;
read_unlock(&foo_rwlock); foo_unref(old);
/* do something with p. */ write_mutex_unlock(&foo_rwlock);
read_lock(&foo_rwlock);
foo_unref(p);
read_unlock(&foo_rwlock);
foo_unref could use a mechanism such as bottom halves to move deallocation
out of the write-side critical section.
RCU resizable arrays
--------------------
Resizable arrays can be used with RCU. The expensive RCU synchronization
only needs to take place when the array is resized. The two items to
take care of are:
(or call_rcu) only needs to take place when the array is resized.
The two items to take care of are:
- ensuring that the old version of the array is available between removal
and reclamation;

View File

@ -118,6 +118,28 @@ extern void synchronize_rcu(void);
extern void rcu_register_thread(void);
extern void rcu_unregister_thread(void);
struct rcu_head;
typedef void RCUCBFunc(struct rcu_head *head);
struct rcu_head {
struct rcu_head *next;
RCUCBFunc *func;
};
extern void call_rcu1(struct rcu_head *head, RCUCBFunc *func);
/* The operands of the minus operator must have the same type,
* which must be the one that we specify in the cast.
*/
#define call_rcu(head, func, field) \
call_rcu1(({ \
char __attribute__((unused)) \
offset_must_be_zero[-offsetof(typeof(*(head)), field)], \
func_type_invalid = (func) - (void (*)(typeof(head)))(func); \
&(head)->field; \
}), \
(RCUCBFunc *)(func))
#ifdef __cplusplus
}
#endif

View File

@ -26,6 +26,7 @@
* IBM's contributions to this file may be relicensed under LGPLv2 or later.
*/
#include "qemu-common.h"
#include <stdio.h>
#include <assert.h>
#include <stdlib.h>
@ -33,6 +34,7 @@
#include <errno.h>
#include "qemu/rcu.h"
#include "qemu/atomic.h"
#include "qemu/thread.h"
/*
* Global grace period counter. Bit 0 is always one in rcu_gp_ctr.
@ -149,6 +151,116 @@ void synchronize_rcu(void)
qemu_mutex_unlock(&rcu_gp_lock);
}
#define RCU_CALL_MIN_SIZE 30
/* Multi-producer, single-consumer queue based on urcu/static/wfqueue.h
* from liburcu. Note that head is only used by the consumer.
*/
static struct rcu_head dummy;
static struct rcu_head *head = &dummy, **tail = &dummy.next;
static int rcu_call_count;
static QemuEvent rcu_call_ready_event;
static void enqueue(struct rcu_head *node)
{
struct rcu_head **old_tail;
node->next = NULL;
old_tail = atomic_xchg(&tail, &node->next);
atomic_mb_set(old_tail, node);
}
static struct rcu_head *try_dequeue(void)
{
struct rcu_head *node, *next;
retry:
/* Test for an empty list, which we do not expect. Note that for
* the consumer head and tail are always consistent. The head
* is consistent because only the consumer reads/writes it.
* The tail, because it is the first step in the enqueuing.
* It is only the next pointers that might be inconsistent.
*/
if (head == &dummy && atomic_mb_read(&tail) == &dummy.next) {
abort();
}
/* If the head node has NULL in its next pointer, the value is
* wrong and we need to wait until its enqueuer finishes the update.
*/
node = head;
next = atomic_mb_read(&head->next);
if (!next) {
return NULL;
}
/* Since we are the sole consumer, and we excluded the empty case
* above, the queue will always have at least two nodes: the
* dummy node, and the one being removed. So we do not need to update
* the tail pointer.
*/
head = next;
/* If we dequeued the dummy node, add it back at the end and retry. */
if (node == &dummy) {
enqueue(node);
goto retry;
}
return node;
}
static void *call_rcu_thread(void *opaque)
{
struct rcu_head *node;
for (;;) {
int tries = 0;
int n = atomic_read(&rcu_call_count);
/* Heuristically wait for a decent number of callbacks to pile up.
* Fetch rcu_call_count now, we only must process elements that were
* added before synchronize_rcu() starts.
*/
while (n < RCU_CALL_MIN_SIZE && ++tries <= 5) {
g_usleep(100000);
qemu_event_reset(&rcu_call_ready_event);
n = atomic_read(&rcu_call_count);
if (n < RCU_CALL_MIN_SIZE) {
qemu_event_wait(&rcu_call_ready_event);
n = atomic_read(&rcu_call_count);
}
}
atomic_sub(&rcu_call_count, n);
synchronize_rcu();
while (n > 0) {
node = try_dequeue();
while (!node) {
qemu_event_reset(&rcu_call_ready_event);
node = try_dequeue();
if (!node) {
qemu_event_wait(&rcu_call_ready_event);
node = try_dequeue();
}
}
n--;
node->func(node);
}
}
abort();
}
void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node))
{
node->func = func;
enqueue(node);
atomic_inc(&rcu_call_count);
qemu_event_set(&rcu_call_ready_event);
}
void rcu_register_thread(void)
{
assert(rcu_reader.ctr == 0);
@ -166,7 +278,14 @@ void rcu_unregister_thread(void)
static void __attribute__((__constructor__)) rcu_init(void)
{
QemuThread thread;
qemu_mutex_init(&rcu_gp_lock);
qemu_event_init(&rcu_gp_event, true);
qemu_event_init(&rcu_call_ready_event, false);
qemu_thread_create(&thread, "call_rcu", call_rcu_thread,
NULL, QEMU_THREAD_DETACHED);
rcu_register_thread();
}