xprtrdma: Pull up sometimes

On some platforms, DMA mapping part of a page is more costly than
copying bytes. Restore the pull-up code and use that when we
think it's going to be faster. The heuristic for now is to pull-up
when the size of the RPC message body fits in the buffer underlying
the head iovec.

Indeed, not involving the I/O MMU can help the RPC/RDMA transport
scale better for tiny I/Os across more RDMA devices. This is because
interaction with the I/O MMU is eliminated, as is handling a Send
completion, for each of these small I/Os. Without the explicit
unmapping, the NIC no longer needs to do a costly internal TLB shoot
down for buffers that are just a handful of bytes.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
This commit is contained in:
Chuck Lever 2019-10-17 14:31:53 -04:00 committed by Anna Schumaker
parent d6764bbd77
commit 614f3c96d7
5 changed files with 85 additions and 7 deletions

View File

@ -532,6 +532,8 @@ DEFINE_WRCH_EVENT(write);
DEFINE_WRCH_EVENT(reply);
TRACE_DEFINE_ENUM(rpcrdma_noch);
TRACE_DEFINE_ENUM(rpcrdma_noch_pullup);
TRACE_DEFINE_ENUM(rpcrdma_noch_mapped);
TRACE_DEFINE_ENUM(rpcrdma_readch);
TRACE_DEFINE_ENUM(rpcrdma_areadch);
TRACE_DEFINE_ENUM(rpcrdma_writech);
@ -540,6 +542,8 @@ TRACE_DEFINE_ENUM(rpcrdma_replych);
#define xprtrdma_show_chunktype(x) \
__print_symbolic(x, \
{ rpcrdma_noch, "inline" }, \
{ rpcrdma_noch_pullup, "pullup" }, \
{ rpcrdma_noch_mapped, "mapped" }, \
{ rpcrdma_readch, "read list" }, \
{ rpcrdma_areadch, "*read list" }, \
{ rpcrdma_writech, "write list" }, \

View File

@ -79,7 +79,7 @@ static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
*p = xdr_zero;
if (rpcrdma_prepare_send_sges(r_xprt, req, RPCRDMA_HDRLEN_MIN,
&rqst->rq_snd_buf, rpcrdma_noch))
&rqst->rq_snd_buf, rpcrdma_noch_pullup))
return -EIO;
trace_xprtrdma_cb_reply(rqst);

View File

@ -392,7 +392,7 @@ static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
unsigned int pos;
int nsegs;
if (rtype == rpcrdma_noch)
if (rtype == rpcrdma_noch_pullup || rtype == rpcrdma_noch_mapped)
goto done;
pos = rqst->rq_snd_buf.head[0].iov_len;
@ -691,6 +691,72 @@ static bool rpcrdma_prepare_tail_iov(struct rpcrdma_req *req,
return false;
}
/* Copy the tail to the end of the head buffer.
*/
static void rpcrdma_pullup_tail_iov(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req,
struct xdr_buf *xdr)
{
unsigned char *dst;
dst = (unsigned char *)xdr->head[0].iov_base;
dst += xdr->head[0].iov_len + xdr->page_len;
memmove(dst, xdr->tail[0].iov_base, xdr->tail[0].iov_len);
r_xprt->rx_stats.pullup_copy_count += xdr->tail[0].iov_len;
}
/* Copy pagelist content into the head buffer.
*/
static void rpcrdma_pullup_pagelist(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req,
struct xdr_buf *xdr)
{
unsigned int len, page_base, remaining;
struct page **ppages;
unsigned char *src, *dst;
dst = (unsigned char *)xdr->head[0].iov_base;
dst += xdr->head[0].iov_len;
ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
page_base = offset_in_page(xdr->page_base);
remaining = xdr->page_len;
while (remaining) {
src = page_address(*ppages);
src += page_base;
len = min_t(unsigned int, PAGE_SIZE - page_base, remaining);
memcpy(dst, src, len);
r_xprt->rx_stats.pullup_copy_count += len;
ppages++;
dst += len;
remaining -= len;
page_base = 0;
}
}
/* Copy the contents of @xdr into @rl_sendbuf and DMA sync it.
* When the head, pagelist, and tail are small, a pull-up copy
* is considerably less costly than DMA mapping the components
* of @xdr.
*
* Assumptions:
* - the caller has already verified that the total length
* of the RPC Call body will fit into @rl_sendbuf.
*/
static bool rpcrdma_prepare_noch_pullup(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req,
struct xdr_buf *xdr)
{
if (unlikely(xdr->tail[0].iov_len))
rpcrdma_pullup_tail_iov(r_xprt, req, xdr);
if (unlikely(xdr->page_len))
rpcrdma_pullup_pagelist(r_xprt, req, xdr);
/* The whole RPC message resides in the head iovec now */
return rpcrdma_prepare_head_iov(r_xprt, req, xdr->len);
}
static bool rpcrdma_prepare_noch_mapped(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req,
struct xdr_buf *xdr)
@ -779,7 +845,11 @@ inline int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
goto out_unmap;
switch (rtype) {
case rpcrdma_noch:
case rpcrdma_noch_pullup:
if (!rpcrdma_prepare_noch_pullup(r_xprt, req, xdr))
goto out_unmap;
break;
case rpcrdma_noch_mapped:
if (!rpcrdma_prepare_noch_mapped(r_xprt, req, xdr))
goto out_unmap;
break;
@ -827,6 +897,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
struct xdr_stream *xdr = &req->rl_stream;
enum rpcrdma_chunktype rtype, wtype;
struct xdr_buf *buf = &rqst->rq_snd_buf;
bool ddp_allowed;
__be32 *p;
int ret;
@ -884,8 +955,9 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
*/
if (rpcrdma_args_inline(r_xprt, rqst)) {
*p++ = rdma_msg;
rtype = rpcrdma_noch;
} else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
rtype = buf->len < rdmab_length(req->rl_sendbuf) ?
rpcrdma_noch_pullup : rpcrdma_noch_mapped;
} else if (ddp_allowed && buf->flags & XDRBUF_WRITE) {
*p++ = rdma_msg;
rtype = rpcrdma_readch;
} else {
@ -927,7 +999,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
goto out_err;
ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len,
&rqst->rq_snd_buf, rtype);
buf, rtype);
if (ret)
goto out_err;

View File

@ -1165,7 +1165,7 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
for (i = 0; i < buf->rb_max_requests; i++) {
struct rpcrdma_req *req;
req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE,
req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE * 2,
GFP_KERNEL);
if (!req)
goto out;

View File

@ -554,6 +554,8 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
enum rpcrdma_chunktype {
rpcrdma_noch = 0,
rpcrdma_noch_pullup,
rpcrdma_noch_mapped,
rpcrdma_readch,
rpcrdma_areadch,
rpcrdma_writech,