mirror of https://gitee.com/openkylin/qemu.git
aio: introduce aio_co_schedule and aio_co_wake
aio_co_wake provides the infrastructure to start a coroutine on a "home" AioContext. It will be used by CoMutex and CoQueue, so that coroutines don't jump from one context to another when they go to sleep on a mutex or waitqueue. However, it can also be used as a more efficient alternative to one-shot bottom halves, and saves the effort of tracking which AioContext a coroutine is running on. aio_co_schedule is the part of aio_co_wake that starts a coroutine on a remove AioContext, but it is also useful to implement e.g. bdrv_set_aio_context callbacks. The implementation of aio_co_schedule is based on a lock-free multiple-producer, single-consumer queue. The multiple producers use cmpxchg to add to a LIFO stack. The consumer (a per-AioContext bottom half) grabs all items added so far, inverts the list to make it FIFO, and goes through it one item at a time until it's empty. The data structure was inspired by OSv, which uses it in the very code we'll "port" to QEMU for the thread-safe CoMutex. Most of the new code is really tests. Signed-off-by: Paolo Bonzini <pbonzini@redhat.com> Reviewed-by: Fam Zheng <famz@redhat.com> Message-id: 20170213135235.12274-3-pbonzini@redhat.com Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
This commit is contained in:
parent
c2b38b277a
commit
0c330a734b
|
@ -47,6 +47,7 @@ typedef void QEMUBHFunc(void *opaque);
|
||||||
typedef bool AioPollFn(void *opaque);
|
typedef bool AioPollFn(void *opaque);
|
||||||
typedef void IOHandler(void *opaque);
|
typedef void IOHandler(void *opaque);
|
||||||
|
|
||||||
|
struct Coroutine;
|
||||||
struct ThreadPool;
|
struct ThreadPool;
|
||||||
struct LinuxAioState;
|
struct LinuxAioState;
|
||||||
|
|
||||||
|
@ -108,6 +109,9 @@ struct AioContext {
|
||||||
bool notified;
|
bool notified;
|
||||||
EventNotifier notifier;
|
EventNotifier notifier;
|
||||||
|
|
||||||
|
QSLIST_HEAD(, Coroutine) scheduled_coroutines;
|
||||||
|
QEMUBH *co_schedule_bh;
|
||||||
|
|
||||||
/* Thread pool for performing work and receiving completion callbacks.
|
/* Thread pool for performing work and receiving completion callbacks.
|
||||||
* Has its own locking.
|
* Has its own locking.
|
||||||
*/
|
*/
|
||||||
|
@ -482,6 +486,34 @@ static inline bool aio_node_check(AioContext *ctx, bool is_external)
|
||||||
return !is_external || !atomic_read(&ctx->external_disable_cnt);
|
return !is_external || !atomic_read(&ctx->external_disable_cnt);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* aio_co_schedule:
|
||||||
|
* @ctx: the aio context
|
||||||
|
* @co: the coroutine
|
||||||
|
*
|
||||||
|
* Start a coroutine on a remote AioContext.
|
||||||
|
*
|
||||||
|
* The coroutine must not be entered by anyone else while aio_co_schedule()
|
||||||
|
* is active. In addition the coroutine must have yielded unless ctx
|
||||||
|
* is the context in which the coroutine is running (i.e. the value of
|
||||||
|
* qemu_get_current_aio_context() from the coroutine itself).
|
||||||
|
*/
|
||||||
|
void aio_co_schedule(AioContext *ctx, struct Coroutine *co);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* aio_co_wake:
|
||||||
|
* @co: the coroutine
|
||||||
|
*
|
||||||
|
* Restart a coroutine on the AioContext where it was running last, thus
|
||||||
|
* preventing coroutines from jumping from one context to another when they
|
||||||
|
* go to sleep.
|
||||||
|
*
|
||||||
|
* aio_co_wake may be executed either in coroutine or non-coroutine
|
||||||
|
* context. The coroutine must not be entered by anyone else while
|
||||||
|
* aio_co_wake() is active.
|
||||||
|
*/
|
||||||
|
void aio_co_wake(struct Coroutine *co);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the AioContext whose event loop runs in the current thread.
|
* Return the AioContext whose event loop runs in the current thread.
|
||||||
*
|
*
|
||||||
|
|
|
@ -40,12 +40,21 @@ struct Coroutine {
|
||||||
CoroutineEntry *entry;
|
CoroutineEntry *entry;
|
||||||
void *entry_arg;
|
void *entry_arg;
|
||||||
Coroutine *caller;
|
Coroutine *caller;
|
||||||
|
|
||||||
|
/* Only used when the coroutine has terminated. */
|
||||||
QSLIST_ENTRY(Coroutine) pool_next;
|
QSLIST_ENTRY(Coroutine) pool_next;
|
||||||
|
|
||||||
size_t locks_held;
|
size_t locks_held;
|
||||||
|
|
||||||
/* Coroutines that should be woken up when we yield or terminate */
|
/* Coroutines that should be woken up when we yield or terminate.
|
||||||
|
* Only used when the coroutine is running.
|
||||||
|
*/
|
||||||
QSIMPLEQ_HEAD(, Coroutine) co_queue_wakeup;
|
QSIMPLEQ_HEAD(, Coroutine) co_queue_wakeup;
|
||||||
|
|
||||||
|
/* Only used when the coroutine has yielded. */
|
||||||
|
AioContext *ctx;
|
||||||
QSIMPLEQ_ENTRY(Coroutine) co_queue_next;
|
QSIMPLEQ_ENTRY(Coroutine) co_queue_next;
|
||||||
|
QSLIST_ENTRY(Coroutine) co_scheduled_next;
|
||||||
};
|
};
|
||||||
|
|
||||||
Coroutine *qemu_coroutine_new(void);
|
Coroutine *qemu_coroutine_new(void);
|
||||||
|
|
|
@ -48,9 +48,10 @@ check-unit-y += tests/test-aio$(EXESUF)
|
||||||
gcov-files-test-aio-y = util/async.c util/qemu-timer.o
|
gcov-files-test-aio-y = util/async.c util/qemu-timer.o
|
||||||
gcov-files-test-aio-$(CONFIG_WIN32) += util/aio-win32.c
|
gcov-files-test-aio-$(CONFIG_WIN32) += util/aio-win32.c
|
||||||
gcov-files-test-aio-$(CONFIG_POSIX) += util/aio-posix.c
|
gcov-files-test-aio-$(CONFIG_POSIX) += util/aio-posix.c
|
||||||
|
check-unit-y += tests/test-aio-multithread$(EXESUF)
|
||||||
|
gcov-files-test-aio-multithread-y = $(gcov-files-test-aio-y)
|
||||||
|
gcov-files-test-aio-multithread-y += util/qemu-coroutine.c tests/iothread.c
|
||||||
check-unit-y += tests/test-throttle$(EXESUF)
|
check-unit-y += tests/test-throttle$(EXESUF)
|
||||||
gcov-files-test-aio-$(CONFIG_WIN32) = aio-win32.c
|
|
||||||
gcov-files-test-aio-$(CONFIG_POSIX) = aio-posix.c
|
|
||||||
check-unit-y += tests/test-thread-pool$(EXESUF)
|
check-unit-y += tests/test-thread-pool$(EXESUF)
|
||||||
gcov-files-test-thread-pool-y = thread-pool.c
|
gcov-files-test-thread-pool-y = thread-pool.c
|
||||||
gcov-files-test-hbitmap-y = util/hbitmap.c
|
gcov-files-test-hbitmap-y = util/hbitmap.c
|
||||||
|
@ -508,7 +509,7 @@ test-qapi-obj-y = tests/test-qapi-visit.o tests/test-qapi-types.o \
|
||||||
$(test-qom-obj-y)
|
$(test-qom-obj-y)
|
||||||
test-crypto-obj-y = $(crypto-obj-y) $(test-qom-obj-y)
|
test-crypto-obj-y = $(crypto-obj-y) $(test-qom-obj-y)
|
||||||
test-io-obj-y = $(io-obj-y) $(test-crypto-obj-y)
|
test-io-obj-y = $(io-obj-y) $(test-crypto-obj-y)
|
||||||
test-block-obj-y = $(block-obj-y) $(test-io-obj-y)
|
test-block-obj-y = $(block-obj-y) $(test-io-obj-y) tests/iothread.o
|
||||||
|
|
||||||
tests/check-qint$(EXESUF): tests/check-qint.o $(test-util-obj-y)
|
tests/check-qint$(EXESUF): tests/check-qint.o $(test-util-obj-y)
|
||||||
tests/check-qstring$(EXESUF): tests/check-qstring.o $(test-util-obj-y)
|
tests/check-qstring$(EXESUF): tests/check-qstring.o $(test-util-obj-y)
|
||||||
|
@ -523,6 +524,7 @@ tests/check-qom-proplist$(EXESUF): tests/check-qom-proplist.o $(test-qom-obj-y)
|
||||||
tests/test-char$(EXESUF): tests/test-char.o $(test-util-obj-y) $(qtest-obj-y) $(test-io-obj-y) $(chardev-obj-y)
|
tests/test-char$(EXESUF): tests/test-char.o $(test-util-obj-y) $(qtest-obj-y) $(test-io-obj-y) $(chardev-obj-y)
|
||||||
tests/test-coroutine$(EXESUF): tests/test-coroutine.o $(test-block-obj-y)
|
tests/test-coroutine$(EXESUF): tests/test-coroutine.o $(test-block-obj-y)
|
||||||
tests/test-aio$(EXESUF): tests/test-aio.o $(test-block-obj-y)
|
tests/test-aio$(EXESUF): tests/test-aio.o $(test-block-obj-y)
|
||||||
|
tests/test-aio-multithread$(EXESUF): tests/test-aio-multithread.o $(test-block-obj-y)
|
||||||
tests/test-throttle$(EXESUF): tests/test-throttle.o $(test-block-obj-y)
|
tests/test-throttle$(EXESUF): tests/test-throttle.o $(test-block-obj-y)
|
||||||
tests/test-blockjob$(EXESUF): tests/test-blockjob.o $(test-block-obj-y) $(test-util-obj-y)
|
tests/test-blockjob$(EXESUF): tests/test-blockjob.o $(test-block-obj-y) $(test-util-obj-y)
|
||||||
tests/test-blockjob-txn$(EXESUF): tests/test-blockjob-txn.o $(test-block-obj-y) $(test-util-obj-y)
|
tests/test-blockjob-txn$(EXESUF): tests/test-blockjob-txn.o $(test-block-obj-y) $(test-util-obj-y)
|
||||||
|
|
|
@ -0,0 +1,91 @@
|
||||||
|
/*
|
||||||
|
* Event loop thread implementation for unit tests
|
||||||
|
*
|
||||||
|
* Copyright Red Hat Inc., 2013, 2016
|
||||||
|
*
|
||||||
|
* Authors:
|
||||||
|
* Stefan Hajnoczi <stefanha@redhat.com>
|
||||||
|
* Paolo Bonzini <pbonzini@redhat.com>
|
||||||
|
*
|
||||||
|
* This work is licensed under the terms of the GNU GPL, version 2 or later.
|
||||||
|
* See the COPYING file in the top-level directory.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "qemu/osdep.h"
|
||||||
|
#include "qapi/error.h"
|
||||||
|
#include "block/aio.h"
|
||||||
|
#include "qemu/main-loop.h"
|
||||||
|
#include "qemu/rcu.h"
|
||||||
|
#include "iothread.h"
|
||||||
|
|
||||||
|
struct IOThread {
|
||||||
|
AioContext *ctx;
|
||||||
|
|
||||||
|
QemuThread thread;
|
||||||
|
QemuMutex init_done_lock;
|
||||||
|
QemuCond init_done_cond; /* is thread initialization done? */
|
||||||
|
bool stopping;
|
||||||
|
};
|
||||||
|
|
||||||
|
static __thread IOThread *my_iothread;
|
||||||
|
|
||||||
|
AioContext *qemu_get_current_aio_context(void)
|
||||||
|
{
|
||||||
|
return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *iothread_run(void *opaque)
|
||||||
|
{
|
||||||
|
IOThread *iothread = opaque;
|
||||||
|
|
||||||
|
rcu_register_thread();
|
||||||
|
|
||||||
|
my_iothread = iothread;
|
||||||
|
qemu_mutex_lock(&iothread->init_done_lock);
|
||||||
|
iothread->ctx = aio_context_new(&error_abort);
|
||||||
|
qemu_cond_signal(&iothread->init_done_cond);
|
||||||
|
qemu_mutex_unlock(&iothread->init_done_lock);
|
||||||
|
|
||||||
|
while (!atomic_read(&iothread->stopping)) {
|
||||||
|
aio_poll(iothread->ctx, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
rcu_unregister_thread();
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void iothread_join(IOThread *iothread)
|
||||||
|
{
|
||||||
|
iothread->stopping = true;
|
||||||
|
aio_notify(iothread->ctx);
|
||||||
|
qemu_thread_join(&iothread->thread);
|
||||||
|
qemu_cond_destroy(&iothread->init_done_cond);
|
||||||
|
qemu_mutex_destroy(&iothread->init_done_lock);
|
||||||
|
aio_context_unref(iothread->ctx);
|
||||||
|
g_free(iothread);
|
||||||
|
}
|
||||||
|
|
||||||
|
IOThread *iothread_new(void)
|
||||||
|
{
|
||||||
|
IOThread *iothread = g_new0(IOThread, 1);
|
||||||
|
|
||||||
|
qemu_mutex_init(&iothread->init_done_lock);
|
||||||
|
qemu_cond_init(&iothread->init_done_cond);
|
||||||
|
qemu_thread_create(&iothread->thread, NULL, iothread_run,
|
||||||
|
iothread, QEMU_THREAD_JOINABLE);
|
||||||
|
|
||||||
|
/* Wait for initialization to complete */
|
||||||
|
qemu_mutex_lock(&iothread->init_done_lock);
|
||||||
|
while (iothread->ctx == NULL) {
|
||||||
|
qemu_cond_wait(&iothread->init_done_cond,
|
||||||
|
&iothread->init_done_lock);
|
||||||
|
}
|
||||||
|
qemu_mutex_unlock(&iothread->init_done_lock);
|
||||||
|
return iothread;
|
||||||
|
}
|
||||||
|
|
||||||
|
AioContext *iothread_get_aio_context(IOThread *iothread)
|
||||||
|
{
|
||||||
|
return iothread->ctx;
|
||||||
|
}
|
|
@ -0,0 +1,25 @@
|
||||||
|
/*
|
||||||
|
* Event loop thread implementation for unit tests
|
||||||
|
*
|
||||||
|
* Copyright Red Hat Inc., 2013, 2016
|
||||||
|
*
|
||||||
|
* Authors:
|
||||||
|
* Stefan Hajnoczi <stefanha@redhat.com>
|
||||||
|
* Paolo Bonzini <pbonzini@redhat.com>
|
||||||
|
*
|
||||||
|
* This work is licensed under the terms of the GNU GPL, version 2 or later.
|
||||||
|
* See the COPYING file in the top-level directory.
|
||||||
|
*/
|
||||||
|
#ifndef TEST_IOTHREAD_H
|
||||||
|
#define TEST_IOTHREAD_H
|
||||||
|
|
||||||
|
#include "block/aio.h"
|
||||||
|
#include "qemu/thread.h"
|
||||||
|
|
||||||
|
typedef struct IOThread IOThread;
|
||||||
|
|
||||||
|
IOThread *iothread_new(void);
|
||||||
|
void iothread_join(IOThread *iothread);
|
||||||
|
AioContext *iothread_get_aio_context(IOThread *iothread);
|
||||||
|
|
||||||
|
#endif
|
|
@ -0,0 +1,213 @@
|
||||||
|
/*
|
||||||
|
* AioContext multithreading tests
|
||||||
|
*
|
||||||
|
* Copyright Red Hat, Inc. 2016
|
||||||
|
*
|
||||||
|
* Authors:
|
||||||
|
* Paolo Bonzini <pbonzini@redhat.com>
|
||||||
|
*
|
||||||
|
* This work is licensed under the terms of the GNU LGPL, version 2 or later.
|
||||||
|
* See the COPYING.LIB file in the top-level directory.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "qemu/osdep.h"
|
||||||
|
#include <glib.h>
|
||||||
|
#include "block/aio.h"
|
||||||
|
#include "qapi/error.h"
|
||||||
|
#include "qemu/coroutine.h"
|
||||||
|
#include "qemu/thread.h"
|
||||||
|
#include "qemu/error-report.h"
|
||||||
|
#include "iothread.h"
|
||||||
|
|
||||||
|
/* AioContext management */
|
||||||
|
|
||||||
|
#define NUM_CONTEXTS 5
|
||||||
|
|
||||||
|
static IOThread *threads[NUM_CONTEXTS];
|
||||||
|
static AioContext *ctx[NUM_CONTEXTS];
|
||||||
|
static __thread int id = -1;
|
||||||
|
|
||||||
|
static QemuEvent done_event;
|
||||||
|
|
||||||
|
/* Run a function synchronously on a remote iothread. */
|
||||||
|
|
||||||
|
typedef struct CtxRunData {
|
||||||
|
QEMUBHFunc *cb;
|
||||||
|
void *arg;
|
||||||
|
} CtxRunData;
|
||||||
|
|
||||||
|
static void ctx_run_bh_cb(void *opaque)
|
||||||
|
{
|
||||||
|
CtxRunData *data = opaque;
|
||||||
|
|
||||||
|
data->cb(data->arg);
|
||||||
|
qemu_event_set(&done_event);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void ctx_run(int i, QEMUBHFunc *cb, void *opaque)
|
||||||
|
{
|
||||||
|
CtxRunData data = {
|
||||||
|
.cb = cb,
|
||||||
|
.arg = opaque
|
||||||
|
};
|
||||||
|
|
||||||
|
qemu_event_reset(&done_event);
|
||||||
|
aio_bh_schedule_oneshot(ctx[i], ctx_run_bh_cb, &data);
|
||||||
|
qemu_event_wait(&done_event);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Starting the iothreads. */
|
||||||
|
|
||||||
|
static void set_id_cb(void *opaque)
|
||||||
|
{
|
||||||
|
int *i = opaque;
|
||||||
|
|
||||||
|
id = *i;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void create_aio_contexts(void)
|
||||||
|
{
|
||||||
|
int i;
|
||||||
|
|
||||||
|
for (i = 0; i < NUM_CONTEXTS; i++) {
|
||||||
|
threads[i] = iothread_new();
|
||||||
|
ctx[i] = iothread_get_aio_context(threads[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
qemu_event_init(&done_event, false);
|
||||||
|
for (i = 0; i < NUM_CONTEXTS; i++) {
|
||||||
|
ctx_run(i, set_id_cb, &i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Stopping the iothreads. */
|
||||||
|
|
||||||
|
static void join_aio_contexts(void)
|
||||||
|
{
|
||||||
|
int i;
|
||||||
|
|
||||||
|
for (i = 0; i < NUM_CONTEXTS; i++) {
|
||||||
|
aio_context_ref(ctx[i]);
|
||||||
|
}
|
||||||
|
for (i = 0; i < NUM_CONTEXTS; i++) {
|
||||||
|
iothread_join(threads[i]);
|
||||||
|
}
|
||||||
|
for (i = 0; i < NUM_CONTEXTS; i++) {
|
||||||
|
aio_context_unref(ctx[i]);
|
||||||
|
}
|
||||||
|
qemu_event_destroy(&done_event);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Basic test for the stuff above. */
|
||||||
|
|
||||||
|
static void test_lifecycle(void)
|
||||||
|
{
|
||||||
|
create_aio_contexts();
|
||||||
|
join_aio_contexts();
|
||||||
|
}
|
||||||
|
|
||||||
|
/* aio_co_schedule test. */
|
||||||
|
|
||||||
|
static Coroutine *to_schedule[NUM_CONTEXTS];
|
||||||
|
|
||||||
|
static bool now_stopping;
|
||||||
|
|
||||||
|
static int count_retry;
|
||||||
|
static int count_here;
|
||||||
|
static int count_other;
|
||||||
|
|
||||||
|
static bool schedule_next(int n)
|
||||||
|
{
|
||||||
|
Coroutine *co;
|
||||||
|
|
||||||
|
co = atomic_xchg(&to_schedule[n], NULL);
|
||||||
|
if (!co) {
|
||||||
|
atomic_inc(&count_retry);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (n == id) {
|
||||||
|
atomic_inc(&count_here);
|
||||||
|
} else {
|
||||||
|
atomic_inc(&count_other);
|
||||||
|
}
|
||||||
|
|
||||||
|
aio_co_schedule(ctx[n], co);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void finish_cb(void *opaque)
|
||||||
|
{
|
||||||
|
schedule_next(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
static coroutine_fn void test_multi_co_schedule_entry(void *opaque)
|
||||||
|
{
|
||||||
|
g_assert(to_schedule[id] == NULL);
|
||||||
|
atomic_mb_set(&to_schedule[id], qemu_coroutine_self());
|
||||||
|
|
||||||
|
while (!atomic_mb_read(&now_stopping)) {
|
||||||
|
int n;
|
||||||
|
|
||||||
|
n = g_test_rand_int_range(0, NUM_CONTEXTS);
|
||||||
|
schedule_next(n);
|
||||||
|
qemu_coroutine_yield();
|
||||||
|
|
||||||
|
g_assert(to_schedule[id] == NULL);
|
||||||
|
atomic_mb_set(&to_schedule[id], qemu_coroutine_self());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void test_multi_co_schedule(int seconds)
|
||||||
|
{
|
||||||
|
int i;
|
||||||
|
|
||||||
|
count_here = count_other = count_retry = 0;
|
||||||
|
now_stopping = false;
|
||||||
|
|
||||||
|
create_aio_contexts();
|
||||||
|
for (i = 0; i < NUM_CONTEXTS; i++) {
|
||||||
|
Coroutine *co1 = qemu_coroutine_create(test_multi_co_schedule_entry, NULL);
|
||||||
|
aio_co_schedule(ctx[i], co1);
|
||||||
|
}
|
||||||
|
|
||||||
|
g_usleep(seconds * 1000000);
|
||||||
|
|
||||||
|
atomic_mb_set(&now_stopping, true);
|
||||||
|
for (i = 0; i < NUM_CONTEXTS; i++) {
|
||||||
|
ctx_run(i, finish_cb, NULL);
|
||||||
|
to_schedule[i] = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
join_aio_contexts();
|
||||||
|
g_test_message("scheduled %d, queued %d, retry %d, total %d\n",
|
||||||
|
count_other, count_here, count_retry,
|
||||||
|
count_here + count_other + count_retry);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void test_multi_co_schedule_1(void)
|
||||||
|
{
|
||||||
|
test_multi_co_schedule(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void test_multi_co_schedule_10(void)
|
||||||
|
{
|
||||||
|
test_multi_co_schedule(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* End of tests. */
|
||||||
|
|
||||||
|
int main(int argc, char **argv)
|
||||||
|
{
|
||||||
|
init_clocks();
|
||||||
|
|
||||||
|
g_test_init(&argc, &argv, NULL);
|
||||||
|
g_test_add_func("/aio/multi/lifecycle", test_lifecycle);
|
||||||
|
if (g_test_quick()) {
|
||||||
|
g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1);
|
||||||
|
} else {
|
||||||
|
g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10);
|
||||||
|
}
|
||||||
|
return g_test_run();
|
||||||
|
}
|
65
util/async.c
65
util/async.c
|
@ -31,6 +31,8 @@
|
||||||
#include "qemu/main-loop.h"
|
#include "qemu/main-loop.h"
|
||||||
#include "qemu/atomic.h"
|
#include "qemu/atomic.h"
|
||||||
#include "block/raw-aio.h"
|
#include "block/raw-aio.h"
|
||||||
|
#include "qemu/coroutine_int.h"
|
||||||
|
#include "trace.h"
|
||||||
|
|
||||||
/***********************************************************/
|
/***********************************************************/
|
||||||
/* bottom halves (can be seen as timers which expire ASAP) */
|
/* bottom halves (can be seen as timers which expire ASAP) */
|
||||||
|
@ -275,6 +277,9 @@ aio_ctx_finalize(GSource *source)
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
assert(QSLIST_EMPTY(&ctx->scheduled_coroutines));
|
||||||
|
qemu_bh_delete(ctx->co_schedule_bh);
|
||||||
|
|
||||||
qemu_lockcnt_lock(&ctx->list_lock);
|
qemu_lockcnt_lock(&ctx->list_lock);
|
||||||
assert(!qemu_lockcnt_count(&ctx->list_lock));
|
assert(!qemu_lockcnt_count(&ctx->list_lock));
|
||||||
while (ctx->first_bh) {
|
while (ctx->first_bh) {
|
||||||
|
@ -364,6 +369,28 @@ static bool event_notifier_poll(void *opaque)
|
||||||
return atomic_read(&ctx->notified);
|
return atomic_read(&ctx->notified);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void co_schedule_bh_cb(void *opaque)
|
||||||
|
{
|
||||||
|
AioContext *ctx = opaque;
|
||||||
|
QSLIST_HEAD(, Coroutine) straight, reversed;
|
||||||
|
|
||||||
|
QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
|
||||||
|
QSLIST_INIT(&straight);
|
||||||
|
|
||||||
|
while (!QSLIST_EMPTY(&reversed)) {
|
||||||
|
Coroutine *co = QSLIST_FIRST(&reversed);
|
||||||
|
QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
|
||||||
|
QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
|
||||||
|
}
|
||||||
|
|
||||||
|
while (!QSLIST_EMPTY(&straight)) {
|
||||||
|
Coroutine *co = QSLIST_FIRST(&straight);
|
||||||
|
QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
|
||||||
|
trace_aio_co_schedule_bh_cb(ctx, co);
|
||||||
|
qemu_coroutine_enter(co);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
AioContext *aio_context_new(Error **errp)
|
AioContext *aio_context_new(Error **errp)
|
||||||
{
|
{
|
||||||
int ret;
|
int ret;
|
||||||
|
@ -379,6 +406,10 @@ AioContext *aio_context_new(Error **errp)
|
||||||
}
|
}
|
||||||
g_source_set_can_recurse(&ctx->source, true);
|
g_source_set_can_recurse(&ctx->source, true);
|
||||||
qemu_lockcnt_init(&ctx->list_lock);
|
qemu_lockcnt_init(&ctx->list_lock);
|
||||||
|
|
||||||
|
ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx);
|
||||||
|
QSLIST_INIT(&ctx->scheduled_coroutines);
|
||||||
|
|
||||||
aio_set_event_notifier(ctx, &ctx->notifier,
|
aio_set_event_notifier(ctx, &ctx->notifier,
|
||||||
false,
|
false,
|
||||||
(EventNotifierHandler *)
|
(EventNotifierHandler *)
|
||||||
|
@ -402,6 +433,40 @@ fail:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void aio_co_schedule(AioContext *ctx, Coroutine *co)
|
||||||
|
{
|
||||||
|
trace_aio_co_schedule(ctx, co);
|
||||||
|
QSLIST_INSERT_HEAD_ATOMIC(&ctx->scheduled_coroutines,
|
||||||
|
co, co_scheduled_next);
|
||||||
|
qemu_bh_schedule(ctx->co_schedule_bh);
|
||||||
|
}
|
||||||
|
|
||||||
|
void aio_co_wake(struct Coroutine *co)
|
||||||
|
{
|
||||||
|
AioContext *ctx;
|
||||||
|
|
||||||
|
/* Read coroutine before co->ctx. Matches smp_wmb in
|
||||||
|
* qemu_coroutine_enter.
|
||||||
|
*/
|
||||||
|
smp_read_barrier_depends();
|
||||||
|
ctx = atomic_read(&co->ctx);
|
||||||
|
|
||||||
|
if (ctx != qemu_get_current_aio_context()) {
|
||||||
|
aio_co_schedule(ctx, co);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (qemu_in_coroutine()) {
|
||||||
|
Coroutine *self = qemu_coroutine_self();
|
||||||
|
assert(self != co);
|
||||||
|
QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, co, co_queue_next);
|
||||||
|
} else {
|
||||||
|
aio_context_acquire(ctx);
|
||||||
|
qemu_coroutine_enter(co);
|
||||||
|
aio_context_release(ctx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void aio_context_ref(AioContext *ctx)
|
void aio_context_ref(AioContext *ctx)
|
||||||
{
|
{
|
||||||
g_source_ref(&ctx->source);
|
g_source_ref(&ctx->source);
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
#include "qemu/atomic.h"
|
#include "qemu/atomic.h"
|
||||||
#include "qemu/coroutine.h"
|
#include "qemu/coroutine.h"
|
||||||
#include "qemu/coroutine_int.h"
|
#include "qemu/coroutine_int.h"
|
||||||
|
#include "block/aio.h"
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
POOL_BATCH_SIZE = 64,
|
POOL_BATCH_SIZE = 64,
|
||||||
|
@ -114,6 +115,13 @@ void qemu_coroutine_enter(Coroutine *co)
|
||||||
}
|
}
|
||||||
|
|
||||||
co->caller = self;
|
co->caller = self;
|
||||||
|
co->ctx = qemu_get_current_aio_context();
|
||||||
|
|
||||||
|
/* Store co->ctx before anything that stores co. Matches
|
||||||
|
* barrier in aio_co_wake.
|
||||||
|
*/
|
||||||
|
smp_wmb();
|
||||||
|
|
||||||
ret = qemu_coroutine_switch(self, co, COROUTINE_ENTER);
|
ret = qemu_coroutine_switch(self, co, COROUTINE_ENTER);
|
||||||
|
|
||||||
qemu_co_queue_run_restart(co);
|
qemu_co_queue_run_restart(co);
|
||||||
|
|
|
@ -6,6 +6,10 @@ run_poll_handlers_end(void *ctx, bool progress) "ctx %p progress %d"
|
||||||
poll_shrink(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64
|
poll_shrink(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64
|
||||||
poll_grow(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64
|
poll_grow(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64
|
||||||
|
|
||||||
|
# util/async.c
|
||||||
|
aio_co_schedule(void *ctx, void *co) "ctx %p co %p"
|
||||||
|
aio_co_schedule_bh_cb(void *ctx, void *co) "ctx %p co %p"
|
||||||
|
|
||||||
# util/thread-pool.c
|
# util/thread-pool.c
|
||||||
thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p"
|
thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p"
|
||||||
thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d"
|
thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d"
|
||||||
|
|
Loading…
Reference in New Issue