mirror of https://gitee.com/openkylin/linux.git
SUNRPC: Improve ordering of transport processing
Since it can take a while before a specific thread gets scheduled, it is better to just implement a first come first served queue mechanism. That way, if a thread is already scheduled and is idle, it can pick up the work to do from the queue. Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com> Signed-off-by: J. Bruce Fields <bfields@redhat.com>
This commit is contained in:
parent
95da1b3a5a
commit
22700f3c6d
|
@ -46,6 +46,7 @@ struct svc_pool {
|
||||||
struct svc_pool_stats sp_stats; /* statistics on pool operation */
|
struct svc_pool_stats sp_stats; /* statistics on pool operation */
|
||||||
#define SP_TASK_PENDING (0) /* still work to do even if no
|
#define SP_TASK_PENDING (0) /* still work to do even if no
|
||||||
* xprt is queued. */
|
* xprt is queued. */
|
||||||
|
#define SP_CONGESTED (1)
|
||||||
unsigned long sp_flags;
|
unsigned long sp_flags;
|
||||||
} ____cacheline_aligned_in_smp;
|
} ____cacheline_aligned_in_smp;
|
||||||
|
|
||||||
|
|
|
@ -380,7 +380,6 @@ void svc_xprt_do_enqueue(struct svc_xprt *xprt)
|
||||||
struct svc_pool *pool;
|
struct svc_pool *pool;
|
||||||
struct svc_rqst *rqstp = NULL;
|
struct svc_rqst *rqstp = NULL;
|
||||||
int cpu;
|
int cpu;
|
||||||
bool queued = false;
|
|
||||||
|
|
||||||
if (!svc_xprt_has_something_to_do(xprt))
|
if (!svc_xprt_has_something_to_do(xprt))
|
||||||
goto out;
|
goto out;
|
||||||
|
@ -401,58 +400,25 @@ void svc_xprt_do_enqueue(struct svc_xprt *xprt)
|
||||||
|
|
||||||
atomic_long_inc(&pool->sp_stats.packets);
|
atomic_long_inc(&pool->sp_stats.packets);
|
||||||
|
|
||||||
redo_search:
|
dprintk("svc: transport %p put into queue\n", xprt);
|
||||||
|
spin_lock_bh(&pool->sp_lock);
|
||||||
|
list_add_tail(&xprt->xpt_ready, &pool->sp_sockets);
|
||||||
|
pool->sp_stats.sockets_queued++;
|
||||||
|
spin_unlock_bh(&pool->sp_lock);
|
||||||
|
|
||||||
/* find a thread for this xprt */
|
/* find a thread for this xprt */
|
||||||
rcu_read_lock();
|
rcu_read_lock();
|
||||||
list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) {
|
list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) {
|
||||||
/* Do a lockless check first */
|
if (test_and_set_bit(RQ_BUSY, &rqstp->rq_flags))
|
||||||
if (test_bit(RQ_BUSY, &rqstp->rq_flags))
|
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
/*
|
|
||||||
* Once the xprt has been queued, it can only be dequeued by
|
|
||||||
* the task that intends to service it. All we can do at that
|
|
||||||
* point is to try to wake this thread back up so that it can
|
|
||||||
* do so.
|
|
||||||
*/
|
|
||||||
if (!queued) {
|
|
||||||
spin_lock_bh(&rqstp->rq_lock);
|
|
||||||
if (test_and_set_bit(RQ_BUSY, &rqstp->rq_flags)) {
|
|
||||||
/* already busy, move on... */
|
|
||||||
spin_unlock_bh(&rqstp->rq_lock);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* this one will do */
|
|
||||||
rqstp->rq_xprt = xprt;
|
|
||||||
svc_xprt_get(xprt);
|
|
||||||
spin_unlock_bh(&rqstp->rq_lock);
|
|
||||||
}
|
|
||||||
rcu_read_unlock();
|
|
||||||
|
|
||||||
atomic_long_inc(&pool->sp_stats.threads_woken);
|
atomic_long_inc(&pool->sp_stats.threads_woken);
|
||||||
wake_up_process(rqstp->rq_task);
|
wake_up_process(rqstp->rq_task);
|
||||||
put_cpu();
|
goto out_unlock;
|
||||||
goto out;
|
|
||||||
}
|
|
||||||
rcu_read_unlock();
|
|
||||||
|
|
||||||
/*
|
|
||||||
* We didn't find an idle thread to use, so we need to queue the xprt.
|
|
||||||
* Do so and then search again. If we find one, we can't hook this one
|
|
||||||
* up to it directly but we can wake the thread up in the hopes that it
|
|
||||||
* will pick it up once it searches for a xprt to service.
|
|
||||||
*/
|
|
||||||
if (!queued) {
|
|
||||||
queued = true;
|
|
||||||
dprintk("svc: transport %p put into queue\n", xprt);
|
|
||||||
spin_lock_bh(&pool->sp_lock);
|
|
||||||
list_add_tail(&xprt->xpt_ready, &pool->sp_sockets);
|
|
||||||
pool->sp_stats.sockets_queued++;
|
|
||||||
spin_unlock_bh(&pool->sp_lock);
|
|
||||||
goto redo_search;
|
|
||||||
}
|
}
|
||||||
|
set_bit(SP_CONGESTED, &pool->sp_flags);
|
||||||
rqstp = NULL;
|
rqstp = NULL;
|
||||||
|
out_unlock:
|
||||||
|
rcu_read_unlock();
|
||||||
put_cpu();
|
put_cpu();
|
||||||
out:
|
out:
|
||||||
trace_svc_xprt_do_enqueue(xprt, rqstp);
|
trace_svc_xprt_do_enqueue(xprt, rqstp);
|
||||||
|
@ -721,38 +687,25 @@ rqst_should_sleep(struct svc_rqst *rqstp)
|
||||||
|
|
||||||
static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
|
static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
|
||||||
{
|
{
|
||||||
struct svc_xprt *xprt;
|
|
||||||
struct svc_pool *pool = rqstp->rq_pool;
|
struct svc_pool *pool = rqstp->rq_pool;
|
||||||
long time_left = 0;
|
long time_left = 0;
|
||||||
|
|
||||||
/* rq_xprt should be clear on entry */
|
/* rq_xprt should be clear on entry */
|
||||||
WARN_ON_ONCE(rqstp->rq_xprt);
|
WARN_ON_ONCE(rqstp->rq_xprt);
|
||||||
|
|
||||||
/* Normally we will wait up to 5 seconds for any required
|
rqstp->rq_xprt = svc_xprt_dequeue(pool);
|
||||||
* cache information to be provided.
|
if (rqstp->rq_xprt)
|
||||||
*/
|
goto out_found;
|
||||||
rqstp->rq_chandle.thread_wait = 5*HZ;
|
|
||||||
|
|
||||||
xprt = svc_xprt_dequeue(pool);
|
|
||||||
if (xprt) {
|
|
||||||
rqstp->rq_xprt = xprt;
|
|
||||||
|
|
||||||
/* As there is a shortage of threads and this request
|
|
||||||
* had to be queued, don't allow the thread to wait so
|
|
||||||
* long for cache updates.
|
|
||||||
*/
|
|
||||||
rqstp->rq_chandle.thread_wait = 1*HZ;
|
|
||||||
clear_bit(SP_TASK_PENDING, &pool->sp_flags);
|
|
||||||
return xprt;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We have to be able to interrupt this wait
|
* We have to be able to interrupt this wait
|
||||||
* to bring down the daemons ...
|
* to bring down the daemons ...
|
||||||
*/
|
*/
|
||||||
set_current_state(TASK_INTERRUPTIBLE);
|
set_current_state(TASK_INTERRUPTIBLE);
|
||||||
|
smp_mb__before_atomic();
|
||||||
|
clear_bit(SP_CONGESTED, &pool->sp_flags);
|
||||||
clear_bit(RQ_BUSY, &rqstp->rq_flags);
|
clear_bit(RQ_BUSY, &rqstp->rq_flags);
|
||||||
smp_mb();
|
smp_mb__after_atomic();
|
||||||
|
|
||||||
if (likely(rqst_should_sleep(rqstp)))
|
if (likely(rqst_should_sleep(rqstp)))
|
||||||
time_left = schedule_timeout(timeout);
|
time_left = schedule_timeout(timeout);
|
||||||
|
@ -761,13 +714,11 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
|
||||||
|
|
||||||
try_to_freeze();
|
try_to_freeze();
|
||||||
|
|
||||||
spin_lock_bh(&rqstp->rq_lock);
|
|
||||||
set_bit(RQ_BUSY, &rqstp->rq_flags);
|
set_bit(RQ_BUSY, &rqstp->rq_flags);
|
||||||
spin_unlock_bh(&rqstp->rq_lock);
|
smp_mb__after_atomic();
|
||||||
|
rqstp->rq_xprt = svc_xprt_dequeue(pool);
|
||||||
xprt = rqstp->rq_xprt;
|
if (rqstp->rq_xprt)
|
||||||
if (xprt != NULL)
|
goto out_found;
|
||||||
return xprt;
|
|
||||||
|
|
||||||
if (!time_left)
|
if (!time_left)
|
||||||
atomic_long_inc(&pool->sp_stats.threads_timedout);
|
atomic_long_inc(&pool->sp_stats.threads_timedout);
|
||||||
|
@ -775,6 +726,15 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
|
||||||
if (signalled() || kthread_should_stop())
|
if (signalled() || kthread_should_stop())
|
||||||
return ERR_PTR(-EINTR);
|
return ERR_PTR(-EINTR);
|
||||||
return ERR_PTR(-EAGAIN);
|
return ERR_PTR(-EAGAIN);
|
||||||
|
out_found:
|
||||||
|
/* Normally we will wait up to 5 seconds for any required
|
||||||
|
* cache information to be provided.
|
||||||
|
*/
|
||||||
|
if (!test_bit(SP_CONGESTED, &pool->sp_flags))
|
||||||
|
rqstp->rq_chandle.thread_wait = 5*HZ;
|
||||||
|
else
|
||||||
|
rqstp->rq_chandle.thread_wait = 1*HZ;
|
||||||
|
return rqstp->rq_xprt;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void svc_add_new_temp_xprt(struct svc_serv *serv, struct svc_xprt *newxpt)
|
static void svc_add_new_temp_xprt(struct svc_serv *serv, struct svc_xprt *newxpt)
|
||||||
|
|
Loading…
Reference in New Issue