mirror of https://gitee.com/openkylin/linux.git
net/ceph: make ceph_msgr_wq non-reentrant
ceph messenger code does a rather complex dancing around multithread workqueue to make sure the same work item isn't executed concurrently on different CPUs. This restriction can be provided by workqueue with WQ_NON_REENTRANT. Make ceph_msgr_wq non-reentrant workqueue with the default concurrency level and remove the QUEUED/BUSY logic. * This removes backoff handling in con_work() but it couldn't reliably block execution of con_work() to begin with - queue_con() can be called after the work started but before BUSY is set. It seems that it was an optimization for a rather cold path and can be safely removed. * The number of concurrent work items is bound by the number of connections and connetions are independent from each other. With the default concurrency level, different connections will be executed independently. Signed-off-by: Tejun Heo <tj@kernel.org> Cc: Sage Weil <sage@newdream.net> Cc: ceph-devel@vger.kernel.org Signed-off-by: Sage Weil <sage@newdream.net>
This commit is contained in:
parent
01e6acc4ea
commit
f363e45fd1
|
@ -110,17 +110,12 @@ struct ceph_msg_pos {
|
|||
|
||||
/*
|
||||
* ceph_connection state bit flags
|
||||
*
|
||||
* QUEUED and BUSY are used together to ensure that only a single
|
||||
* thread is currently opening, reading or writing data to the socket.
|
||||
*/
|
||||
#define LOSSYTX 0 /* we can close channel or drop messages on errors */
|
||||
#define CONNECTING 1
|
||||
#define NEGOTIATING 2
|
||||
#define KEEPALIVE_PENDING 3
|
||||
#define WRITE_PENDING 4 /* we have data ready to send */
|
||||
#define QUEUED 5 /* there is work queued on this connection */
|
||||
#define BUSY 6 /* work is being done */
|
||||
#define STANDBY 8 /* no outgoing messages, socket closed. we keep
|
||||
* the ceph_connection around to maintain shared
|
||||
* state with the peer. */
|
||||
|
|
|
@ -96,7 +96,7 @@ struct workqueue_struct *ceph_msgr_wq;
|
|||
|
||||
int ceph_msgr_init(void)
|
||||
{
|
||||
ceph_msgr_wq = create_workqueue("ceph-msgr");
|
||||
ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
|
||||
if (!ceph_msgr_wq) {
|
||||
pr_err("msgr_init failed to create workqueue\n");
|
||||
return -ENOMEM;
|
||||
|
@ -1920,20 +1920,6 @@ static int try_read(struct ceph_connection *con)
|
|||
/*
|
||||
* Atomically queue work on a connection. Bump @con reference to
|
||||
* avoid races with connection teardown.
|
||||
*
|
||||
* There is some trickery going on with QUEUED and BUSY because we
|
||||
* only want a _single_ thread operating on each connection at any
|
||||
* point in time, but we want to use all available CPUs.
|
||||
*
|
||||
* The worker thread only proceeds if it can atomically set BUSY. It
|
||||
* clears QUEUED and does it's thing. When it thinks it's done, it
|
||||
* clears BUSY, then rechecks QUEUED.. if it's set again, it loops
|
||||
* (tries again to set BUSY).
|
||||
*
|
||||
* To queue work, we first set QUEUED, _then_ if BUSY isn't set, we
|
||||
* try to queue work. If that fails (work is already queued, or BUSY)
|
||||
* we give up (work also already being done or is queued) but leave QUEUED
|
||||
* set so that the worker thread will loop if necessary.
|
||||
*/
|
||||
static void queue_con(struct ceph_connection *con)
|
||||
{
|
||||
|
@ -1948,11 +1934,7 @@ static void queue_con(struct ceph_connection *con)
|
|||
return;
|
||||
}
|
||||
|
||||
set_bit(QUEUED, &con->state);
|
||||
if (test_bit(BUSY, &con->state)) {
|
||||
dout("queue_con %p - already BUSY\n", con);
|
||||
con->ops->put(con);
|
||||
} else if (!queue_work(ceph_msgr_wq, &con->work.work)) {
|
||||
if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) {
|
||||
dout("queue_con %p - already queued\n", con);
|
||||
con->ops->put(con);
|
||||
} else {
|
||||
|
@ -1967,15 +1949,6 @@ static void con_work(struct work_struct *work)
|
|||
{
|
||||
struct ceph_connection *con = container_of(work, struct ceph_connection,
|
||||
work.work);
|
||||
int backoff = 0;
|
||||
|
||||
more:
|
||||
if (test_and_set_bit(BUSY, &con->state) != 0) {
|
||||
dout("con_work %p BUSY already set\n", con);
|
||||
goto out;
|
||||
}
|
||||
dout("con_work %p start, clearing QUEUED\n", con);
|
||||
clear_bit(QUEUED, &con->state);
|
||||
|
||||
mutex_lock(&con->mutex);
|
||||
|
||||
|
@ -1994,28 +1967,13 @@ static void con_work(struct work_struct *work)
|
|||
try_read(con) < 0 ||
|
||||
try_write(con) < 0) {
|
||||
mutex_unlock(&con->mutex);
|
||||
backoff = 1;
|
||||
ceph_fault(con); /* error/fault path */
|
||||
goto done_unlocked;
|
||||
}
|
||||
|
||||
done:
|
||||
mutex_unlock(&con->mutex);
|
||||
|
||||
done_unlocked:
|
||||
clear_bit(BUSY, &con->state);
|
||||
dout("con->state=%lu\n", con->state);
|
||||
if (test_bit(QUEUED, &con->state)) {
|
||||
if (!backoff || test_bit(OPENING, &con->state)) {
|
||||
dout("con_work %p QUEUED reset, looping\n", con);
|
||||
goto more;
|
||||
}
|
||||
dout("con_work %p QUEUED reset, but just faulted\n", con);
|
||||
clear_bit(QUEUED, &con->state);
|
||||
}
|
||||
dout("con_work %p done\n", con);
|
||||
|
||||
out:
|
||||
con->ops->put(con);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue