diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2017-09-11 22:01:44 -0700 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2017-09-11 22:01:44 -0700 |
commit | 8e7757d83d07cc77ee2661e9615a2f9f4ce540cd (patch) | |
tree | b6c15efae2c117e7fbbe56ad0aaa2859d1cd35a8 /net/sunrpc | |
parent | dd198ce7141aa8dd9ffcc9549de422fb055508de (diff) | |
parent | 1bd5d6d08ea7ed0794c8a3908383d6d6fc202cdd (diff) |
Merge tag 'nfs-for-4.14-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs
Pull NFS client updates from Trond Myklebust:
"Hightlights include:
Stable bugfixes:
- Fix mirror allocation in the writeback code to avoid a use after
free
- Fix the O_DSYNC writes to use the correct byte range
- Fix 2 use after free issues in the I/O code
Features:
- Writeback fixes to split up the inode->i_lock in order to reduce
contention
- RPC client receive fixes to reduce the amount of time the
xprt->transport_lock is held when receiving data from a socket into
am XDR buffer.
- Ditto fixes to reduce contention between call side users of the
rdma rb_lock, and its use in rpcrdma_reply_handler.
- Re-arrange rdma stats to reduce false cacheline sharing.
- Various rdma cleanups and optimisations.
- Refactor the NFSv4.1 exchange id code and clean up the code.
- Const-ify all instances of struct rpc_xprt_ops
Bugfixes:
- Fix the NFSv2 'sec=' mount option.
- NFSv4.1: don't use machine credentials for CLOSE when using
'sec=sys'
- Fix the NFSv3 GRANT callback when the port changes on the server.
- Fix livelock issues with COMMIT
- NFSv4: Use correct inode in _nfs4_opendata_to_nfs4_state() when
doing and NFSv4.1 open by filehandle"
* tag 'nfs-for-4.14-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs: (69 commits)
NFS: Count the bytes of skipped subrequests in nfs_lock_and_join_requests()
NFS: Don't hold the group lock when calling nfs_release_request()
NFS: Remove pnfs_generic_transfer_commit_list()
NFS: nfs_lock_and_join_requests and nfs_scan_commit_list can deadlock
NFS: Fix 2 use after free issues in the I/O code
NFS: Sync the correct byte range during synchronous writes
lockd: Delete an error message for a failed memory allocation in reclaimer()
NFS: remove jiffies field from access cache
NFS: flush data when locking a file to ensure cache coherence for mmap.
SUNRPC: remove some dead code.
NFS: don't expect errors from mempool_alloc().
xprtrdma: Use xprt_pin_rqst in rpcrdma_reply_handler
xprtrdma: Re-arrange struct rx_stats
NFS: Fix NFSv2 security settings
NFSv4.1: don't use machine credentials for CLOSE when using 'sec=sys'
SUNRPC: ECONNREFUSED should cause a rebind.
NFS: Remove unused parameter gfp_flags from nfs_pageio_init()
NFSv4: Fix up mirror allocation
SUNRPC: Add a separate spinlock to protect the RPC request receive list
SUNRPC: Cleanup xs_tcp_read_common()
...
Diffstat (limited to 'net/sunrpc')
-rw-r--r-- | net/sunrpc/backchannel_rqst.c | 4 | ||||
-rw-r--r-- | net/sunrpc/clnt.c | 12 | ||||
-rw-r--r-- | net/sunrpc/svcsock.c | 6 | ||||
-rw-r--r-- | net/sunrpc/xprt.c | 57 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/backchannel.c | 71 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/fmr_ops.c | 10 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/frwr_ops.c | 12 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 902 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 9 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/transport.c | 8 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 22 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/xprt_rdma.h | 63 | ||||
-rw-r--r-- | net/sunrpc/xprtsock.c | 90 |
13 files changed, 712 insertions, 554 deletions
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c index ac701c28f44f..c2c68a15b59d 100644 --- a/net/sunrpc/backchannel_rqst.c +++ b/net/sunrpc/backchannel_rqst.c @@ -171,10 +171,10 @@ int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs) /* * Add the temporary list to the backchannel preallocation list */ - spin_lock_bh(&xprt->bc_pa_lock); + spin_lock(&xprt->bc_pa_lock); list_splice(&tmp_list, &xprt->bc_pa_list); xprt_inc_alloc_count(xprt, min_reqs); - spin_unlock_bh(&xprt->bc_pa_lock); + spin_unlock(&xprt->bc_pa_lock); dprintk("RPC: setup backchannel transport done\n"); return 0; diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 2e49d1f892b7..2ad827db2704 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -1903,6 +1903,14 @@ call_connect_status(struct rpc_task *task) task->tk_status = 0; switch (status) { case -ECONNREFUSED: + /* A positive refusal suggests a rebind is needed. */ + if (RPC_IS_SOFTCONN(task)) + break; + if (clnt->cl_autobind) { + rpc_force_rebind(clnt); + task->tk_action = call_bind; + return; + } case -ECONNRESET: case -ECONNABORTED: case -ENETUNREACH: @@ -2139,10 +2147,6 @@ call_status(struct rpc_task *task) rpc_delay(task, 3*HZ); case -ETIMEDOUT: task->tk_action = call_timeout; - if (!(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) - && task->tk_client->cl_discrtry) - xprt_conditional_disconnect(req->rq_xprt, - req->rq_connect_cookie); break; case -ECONNREFUSED: case -ECONNRESET: diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c index 399fab5d1936..ff8e06cd067e 100644 --- a/net/sunrpc/svcsock.c +++ b/net/sunrpc/svcsock.c @@ -1013,7 +1013,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp) if (!bc_xprt) return -EAGAIN; - spin_lock_bh(&bc_xprt->transport_lock); + spin_lock(&bc_xprt->recv_lock); req = xprt_lookup_rqst(bc_xprt, xid); if (!req) goto unlock_notfound; @@ -1031,7 +1031,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp) memcpy(dst->iov_base, src->iov_base, src->iov_len); xprt_complete_rqst(req->rq_task, rqstp->rq_arg.len); rqstp->rq_arg.len = 0; - spin_unlock_bh(&bc_xprt->transport_lock); + spin_unlock(&bc_xprt->recv_lock); return 0; unlock_notfound: printk(KERN_NOTICE @@ -1040,7 +1040,7 @@ unlock_notfound: __func__, ntohl(calldir), bc_xprt, ntohl(xid)); unlock_eagain: - spin_unlock_bh(&bc_xprt->transport_lock); + spin_unlock(&bc_xprt->recv_lock); return -EAGAIN; } diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 4654a9934269..e741ec2b4d8e 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -844,6 +844,50 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) } EXPORT_SYMBOL_GPL(xprt_lookup_rqst); +/** + * xprt_pin_rqst - Pin a request on the transport receive list + * @req: Request to pin + * + * Caller must ensure this is atomic with the call to xprt_lookup_rqst() + * so should be holding the xprt transport lock. + */ +void xprt_pin_rqst(struct rpc_rqst *req) +{ + set_bit(RPC_TASK_MSG_RECV, &req->rq_task->tk_runstate); +} +EXPORT_SYMBOL_GPL(xprt_pin_rqst); + +/** + * xprt_unpin_rqst - Unpin a request on the transport receive list + * @req: Request to pin + * + * Caller should be holding the xprt transport lock. + */ +void xprt_unpin_rqst(struct rpc_rqst *req) +{ + struct rpc_task *task = req->rq_task; + + clear_bit(RPC_TASK_MSG_RECV, &task->tk_runstate); + if (test_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate)) + wake_up_bit(&task->tk_runstate, RPC_TASK_MSG_RECV); +} +EXPORT_SYMBOL_GPL(xprt_unpin_rqst); + +static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) +__must_hold(&req->rq_xprt->recv_lock) +{ + struct rpc_task *task = req->rq_task; + + if (task && test_bit(RPC_TASK_MSG_RECV, &task->tk_runstate)) { + spin_unlock(&req->rq_xprt->recv_lock); + set_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate); + wait_on_bit(&task->tk_runstate, RPC_TASK_MSG_RECV, + TASK_UNINTERRUPTIBLE); + clear_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate); + spin_lock(&req->rq_xprt->recv_lock); + } +} + static void xprt_update_rtt(struct rpc_task *task) { struct rpc_rqst *req = task->tk_rqstp; @@ -966,13 +1010,13 @@ void xprt_transmit(struct rpc_task *task) /* * Add to the list only if we're expecting a reply */ - spin_lock_bh(&xprt->transport_lock); /* Update the softirq receive buffer */ memcpy(&req->rq_private_buf, &req->rq_rcv_buf, sizeof(req->rq_private_buf)); /* Add request to the receive list */ + spin_lock(&xprt->recv_lock); list_add_tail(&req->rq_list, &xprt->recv); - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&xprt->recv_lock); xprt_reset_majortimeo(req); /* Turn off autodisconnect */ del_singleshot_timer_sync(&xprt->timer); @@ -1287,12 +1331,16 @@ void xprt_release(struct rpc_task *task) task->tk_ops->rpc_count_stats(task, task->tk_calldata); else if (task->tk_client) rpc_count_iostats(task, task->tk_client->cl_metrics); + spin_lock(&xprt->recv_lock); + if (!list_empty(&req->rq_list)) { + list_del(&req->rq_list); + xprt_wait_on_pinned_rqst(req); + } + spin_unlock(&xprt->recv_lock); spin_lock_bh(&xprt->transport_lock); xprt->ops->release_xprt(xprt, task); if (xprt->ops->release_request) xprt->ops->release_request(task); - if (!list_empty(&req->rq_list)) - list_del(&req->rq_list); xprt->last_used = jiffies; xprt_schedule_autodisconnect(xprt); spin_unlock_bh(&xprt->transport_lock); @@ -1318,6 +1366,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net) spin_lock_init(&xprt->transport_lock); spin_lock_init(&xprt->reserve_lock); + spin_lock_init(&xprt->recv_lock); INIT_LIST_HEAD(&xprt->free); INIT_LIST_HEAD(&xprt->recv); diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index 03f6b5840764..d31d0ac5ada9 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -49,6 +49,7 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt, if (IS_ERR(rb)) goto out_fail; req->rl_rdmabuf = rb; + xdr_buf_init(&req->rl_hdrbuf, rb->rg_base, rdmab_length(rb)); size = r_xprt->rx_data.inline_rsize; rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, GFP_KERNEL); @@ -202,20 +203,24 @@ size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt) */ int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) { - struct rpc_xprt *xprt = rqst->rq_xprt; - struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); struct rpcrdma_req *req = rpcr_to_rdmar(rqst); - struct rpcrdma_msg *headerp; - - headerp = rdmab_to_msg(req->rl_rdmabuf); - headerp->rm_xid = rqst->rq_xid; - headerp->rm_vers = rpcrdma_version; - headerp->rm_credit = - cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests); - headerp->rm_type = rdma_msg; - headerp->rm_body.rm_chunks[0] = xdr_zero; - headerp->rm_body.rm_chunks[1] = xdr_zero; - headerp->rm_body.rm_chunks[2] = xdr_zero; + __be32 *p; + + rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0); + xdr_init_encode(&req->rl_stream, &req->rl_hdrbuf, + req->rl_rdmabuf->rg_base); + + p = xdr_reserve_space(&req->rl_stream, 28); + if (unlikely(!p)) + return -EIO; + *p++ = rqst->rq_xid; + *p++ = rpcrdma_version; + *p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests); + *p++ = rdma_msg; + *p++ = xdr_zero; + *p++ = xdr_zero; + *p = xdr_zero; if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, RPCRDMA_HDRLEN_MIN, &rqst->rq_snd_buf, rpcrdma_noch)) @@ -271,9 +276,6 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) * @xprt: transport receiving the call * @rep: receive buffer containing the call * - * Called in the RPC reply handler, which runs in a tasklet. - * Be quick about it. - * * Operational assumptions: * o Backchannel credits are ignored, just as the NFS server * forechannel currently does @@ -284,7 +286,6 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep) { struct rpc_xprt *xprt = &r_xprt->rx_xprt; - struct rpcrdma_msg *headerp; struct svc_serv *bc_serv; struct rpcrdma_req *req; struct rpc_rqst *rqst; @@ -292,24 +293,15 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, size_t size; __be32 *p; - headerp = rdmab_to_msg(rep->rr_rdmabuf); + p = xdr_inline_decode(&rep->rr_stream, 0); + size = xdr_stream_remaining(&rep->rr_stream); + #ifdef RPCRDMA_BACKCHANNEL_DEBUG pr_info("RPC: %s: callback XID %08x, length=%u\n", - __func__, be32_to_cpu(headerp->rm_xid), rep->rr_len); - pr_info("RPC: %s: %*ph\n", __func__, rep->rr_len, headerp); + __func__, be32_to_cpup(p), size); + pr_info("RPC: %s: %*ph\n", __func__, size, p); #endif - /* Sanity check: - * Need at least enough bytes for RPC/RDMA header, as code - * here references the header fields by array offset. Also, - * backward calls are always inline, so ensure there - * are some bytes beyond the RPC/RDMA header. - */ - if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24) - goto out_short; - p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN); - size = rep->rr_len - RPCRDMA_HDRLEN_MIN; - /* Grab a free bc rqst */ spin_lock(&xprt->bc_pa_lock); if (list_empty(&xprt->bc_pa_list)) { @@ -325,7 +317,7 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, /* Prepare rqst */ rqst->rq_reply_bytes_recvd = 0; rqst->rq_bytes_sent = 0; - rqst->rq_xid = headerp->rm_xid; + rqst->rq_xid = *p; rqst->rq_private_buf.len = size; set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); @@ -337,9 +329,9 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, buf->len = size; /* The receive buffer has to be hooked to the rpcrdma_req - * so that it can be reposted after the server is done - * parsing it but just before sending the backward - * direction reply. + * so that it is not released while the req is pointing + * to its buffer, and so that it can be reposted after + * the Upper Layer is done decoding it. */ req = rpcr_to_rdmar(rqst); dprintk("RPC: %s: attaching rep %p to req %p\n", @@ -367,13 +359,4 @@ out_overflow: * when the connection is re-established. */ return; - -out_short: - pr_warn("RPC/RDMA short backward direction call\n"); - - if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep)) - xprt_disconnect_done(xprt); - else - pr_warn("RPC: %s: reposting rep %p\n", - __func__, rep); } diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c index d3f84bb1d443..6c7151341194 100644 --- a/net/sunrpc/xprtrdma/fmr_ops.c +++ b/net/sunrpc/xprtrdma/fmr_ops.c @@ -177,7 +177,7 @@ fmr_op_maxpages(struct rpcrdma_xprt *r_xprt) /* Use the ib_map_phys_fmr() verb to register a memory region * for remote access via RDMA READ or RDMA WRITE. */ -static int +static struct rpcrdma_mr_seg * fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, int nsegs, bool writing, struct rpcrdma_mw **out) { @@ -188,7 +188,7 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, mw = rpcrdma_get_mw(r_xprt); if (!mw) - return -ENOBUFS; + return ERR_PTR(-ENOBUFS); pageoff = offset_in_page(seg1->mr_offset); seg1->mr_offset -= pageoff; /* start of page */ @@ -232,13 +232,13 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, mw->mw_offset = dma_pages[0] + pageoff; *out = mw; - return mw->mw_nents; + return seg; out_dmamap_err: pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n", mw->mw_sg, i); rpcrdma_put_mw(r_xprt, mw); - return -EIO; + return ERR_PTR(-EIO); out_maperr: pr_err("rpcrdma: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n", @@ -247,7 +247,7 @@ out_maperr: ib_dma_unmap_sg(r_xprt->rx_ia.ri_device, mw->mw_sg, mw->mw_nents, mw->mw_dir); rpcrdma_put_mw(r_xprt, mw); - return -EIO; + return ERR_PTR(-EIO); } /* Invalidate all memory regions that were registered for "req". diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 6aea36a38bfd..5a936a6a31a3 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -344,7 +344,7 @@ frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) /* Post a REG_MR Work Request to register a memory region * for remote access via RDMA READ or RDMA WRITE. */ -static int +static struct rpcrdma_mr_seg * frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, int nsegs, bool writing, struct rpcrdma_mw **out) { @@ -364,7 +364,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, rpcrdma_defer_mr_recovery(mw); mw = rpcrdma_get_mw(r_xprt); if (!mw) - return -ENOBUFS; + return ERR_PTR(-ENOBUFS); } while (mw->frmr.fr_state != FRMR_IS_INVALID); frmr = &mw->frmr; frmr->fr_state = FRMR_IS_VALID; @@ -429,25 +429,25 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, mw->mw_offset = mr->iova; *out = mw; - return mw->mw_nents; + return seg; out_dmamap_err: pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n", mw->mw_sg, i); frmr->fr_state = FRMR_IS_INVALID; rpcrdma_put_mw(r_xprt, mw); - return -EIO; + return ERR_PTR(-EIO); out_mapmr_err: pr_err("rpcrdma: failed to map mr %p (%d/%d)\n", frmr->fr_mr, n, mw->mw_nents); rpcrdma_defer_mr_recovery(mw); - return -EIO; + return ERR_PTR(-EIO); out_senderr: pr_err("rpcrdma: FRMR registration ib_post_send returned %i\n", rc); rpcrdma_defer_mr_recovery(mw); - return -ENOTCONN; + return ERR_PTR(-ENOTCONN); } /* Invalidate all memory regions that were registered for "req". diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index ca4d6e4528f3..f1889f4d4803 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -169,40 +169,41 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt, return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read; } -/* Split "vec" on page boundaries into segments. FMR registers pages, - * not a byte range. Other modes coalesce these segments into a single - * MR when they can. +/* Split @vec on page boundaries into SGEs. FMR registers pages, not + * a byte range. Other modes coalesce these SGEs into a single MR + * when they can. + * + * Returns pointer to next available SGE, and bumps the total number + * of SGEs consumed. */ -static int -rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, int n) +static struct rpcrdma_mr_seg * +rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, + unsigned int *n) { - size_t page_offset; - u32 remaining; + u32 remaining, page_offset; char *base; base = vec->iov_base; page_offset = offset_in_page(base); remaining = vec->iov_len; - while (remaining && n < RPCRDMA_MAX_SEGS) { - seg[n].mr_page = NULL; - seg[n].mr_offset = base; - seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining); - remaining -= seg[n].mr_len; - base += seg[n].mr_len; - ++n; + while (remaining) { + seg->mr_page = NULL; + seg->mr_offset = base; + seg->mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining); + remaining -= seg->mr_len; + base += seg->mr_len; + ++seg; + ++(*n); page_offset = 0; } - return n; + return seg; } -/* - * Chunk assembly from upper layer xdr_buf. - * - * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk - * elements. Segments are then coalesced when registered, if possible - * within the selected memreg mode. +/* Convert @xdrbuf into SGEs no larger than a page each. As they + * are registered, these SGEs are then coalesced into RDMA segments + * when the selected memreg mode supports it. * - * Returns positive number of segments converted, or a negative errno. + * Returns positive number of SGEs consumed, or a negative errno. */ static int @@ -210,47 +211,41 @@ rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf, unsigned int pos, enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg) { - int len, n, p, page_base; + unsigned long page_base; + unsigned int len, n; struct page **ppages; n = 0; - if (pos == 0) { - n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n); - if (n == RPCRDMA_MAX_SEGS) - goto out_overflow; - } + if (pos == 0) + seg = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, &n); len = xdrbuf->page_len; ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT); page_base = offset_in_page(xdrbuf->page_base); - p = 0; - while (len && n < RPCRDMA_MAX_SEGS) { - if (!ppages[p]) { - /* alloc the pagelist for receiving buffer */ - ppages[p] = alloc_page(GFP_ATOMIC); - if (!ppages[p]) + while (len) { + if (unlikely(!*ppages)) { + /* XXX: Certain upper layer operations do + * not provide receive buffer pages. + */ + *ppages = alloc_page(GFP_ATOMIC); + if (!*ppages) return -EAGAIN; } - seg[n].mr_page = ppages[p]; - seg[n].mr_offset = (void *)(unsigned long) page_base; - seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len); - if (seg[n].mr_len > PAGE_SIZE) - goto out_overflow; - len -= seg[n].mr_len; + seg->mr_page = *ppages; + seg->mr_offset = (char *)page_base; + seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len); + len -= seg->mr_len; + ++ppages; + ++seg; ++n; - ++p; - page_base = 0; /* page offset only applies to first page */ + page_base = 0; } - /* Message overflows the seg array */ - if (len && n == RPCRDMA_MAX_SEGS) - goto out_overflow; - /* When encoding a Read chunk, the tail iovec contains an * XDR pad and may be omitted. */ if (type == rpcrdma_readch && r_xprt->rx_ia.ri_implicit_roundup) - return n; + goto out; /* When encoding a Write chunk, some servers need to see an * extra segment for non-XDR-aligned Write chunks. The upper @@ -258,30 +253,81 @@ rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf, * for this purpose. */ if (type == rpcrdma_writech && r_xprt->rx_ia.ri_implicit_roundup) - return n; + goto out; - if (xdrbuf->tail[0].iov_len) { - n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n); - if (n == RPCRDMA_MAX_SEGS) - goto out_overflow; - } + if (xdrbuf->tail[0].iov_len) + seg = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n); +out: + if (unlikely(n > RPCRDMA_MAX_SEGS)) + return -EIO; return n; +} -out_overflow: - pr_err("rpcrdma: segment array overflow\n"); - return -EIO; +static inline int +encode_item_present(struct xdr_stream *xdr) +{ + __be32 *p; + + p = xdr_reserve_space(xdr, sizeof(*p)); + if (unlikely(!p)) + return -EMSGSIZE; + + *p = xdr_one; + return 0; } -static inline __be32 * +static inline int +encode_item_not_present(struct xdr_stream *xdr) +{ + __be32 *p; + + p = xdr_reserve_space(xdr, sizeof(*p)); + if (unlikely(!p)) + return -EMSGSIZE; + + *p = xdr_zero; + return 0; +} + +static void xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mw *mw) { *iptr++ = cpu_to_be32(mw->mw_handle); *iptr++ = cpu_to_be32(mw->mw_length); - return xdr_encode_hyper(iptr, mw->mw_offset); + xdr_encode_hyper(iptr, mw->mw_offset); } -/* XDR-encode the Read list. Supports encoding a list of read +static int +encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mw *mw) +{ + __be32 *p; + + p = xdr_reserve_space(xdr, 4 * sizeof(*p)); + if (unlikely(!p)) + return -EMSGSIZE; + + xdr_encode_rdma_segment(p, mw); + return 0; +} + +static int +encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mw *mw, + u32 position) +{ + __be32 *p; + + p = xdr_reserve_space(xdr, 6 * sizeof(*p)); + if (unlikely(!p)) + return -EMSGSIZE; + + *p++ = xdr_one; /* Item present */ + *p++ = cpu_to_be32(position); + xdr_encode_rdma_segment(p, mw); + return 0; +} + +/* Register and XDR encode the Read list. Supports encoding a list of read * segments that belong to a single read chunk. * * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): @@ -290,23 +336,20 @@ xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mw *mw) * N elements, position P (same P for all chunks of same arg!): * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0 * - * Returns a pointer to the XDR word in the RDMA header following - * the end of the Read list, or an error pointer. + * Returns zero on success, or a negative errno if a failure occurred. + * @xdr is advanced to the next position in the stream. + * + * Only a single @pos value is currently supported. */ -static __be32 * -rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, - struct rpcrdma_req *req, struct rpc_rqst *rqst, - __be32 *iptr, enum rpcrdma_chunktype rtype) +static noinline int +rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, + struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype) { + struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; struct rpcrdma_mw *mw; unsigned int pos; - int n, nsegs; - - if (rtype == rpcrdma_noch) { - *iptr++ = xdr_zero; /* item not present */ - return iptr; - } + int nsegs; pos = rqst->rq_snd_buf.head[0].iov_len; if (rtype == rpcrdma_areadch) @@ -315,40 +358,33 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos, rtype, seg); if (nsegs < 0) - return ERR_PTR(nsegs); + return nsegs; do { - n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, - false, &mw); - if (n < 0) - return ERR_PTR(n); + seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, + false, &mw); + if (IS_ERR(seg)) + return PTR_ERR(seg); rpcrdma_push_mw(mw, &req->rl_registered); - *iptr++ = xdr_one; /* item present */ - - /* All read segments in this chunk - * have the same "position". - */ - *iptr++ = cpu_to_be32(pos); - iptr = xdr_encode_rdma_segment(iptr, mw); + if (encode_read_segment(xdr, mw, pos) < 0) + return -EMSGSIZE; dprintk("RPC: %5u %s: pos %u %u@0x%016llx:0x%08x (%s)\n", rqst->rq_task->tk_pid, __func__, pos, mw->mw_length, (unsigned long long)mw->mw_offset, - mw->mw_handle, n < nsegs ? "more" : "last"); + mw->mw_handle, mw->mw_nents < nsegs ? "more" : "last"); r_xprt->rx_stats.read_chunk_count++; - seg += n; - nsegs -= n; + nsegs -= mw->mw_nents; } while (nsegs); - /* Finish Read list */ - *iptr++ = xdr_zero; /* Next item not present */ - return iptr; + return 0; } -/* XDR-encode the Write list. Supports encoding a list containing - * one array of plain segments that belong to a single write chunk. +/* Register and XDR encode the Write list. Supports encoding a list + * containing one array of plain segments that belong to a single + * write chunk. * * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): * @@ -356,66 +392,65 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, * N elements: * 1 - N - HLOO - HLOO - ... - HLOO - 0 * - * Returns a pointer to the XDR word in the RDMA header following - * the end of the Write list, or an error pointer. + * Returns zero on success, or a negative errno if a failure occurred. + * @xdr is advanced to the next position in the stream. + * + * Only a single Write chunk is currently supported. */ -static __be32 * +static noinline int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, - struct rpc_rqst *rqst, __be32 *iptr, - enum rpcrdma_chunktype wtype) + struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype) { + struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; struct rpcrdma_mw *mw; - int n, nsegs, nchunks; + int nsegs, nchunks; __be32 *segcount; - if (wtype != rpcrdma_writech) { - *iptr++ = xdr_zero; /* no Write list present */ - return iptr; - } - seg = req->rl_segments; nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, rqst->rq_rcv_buf.head[0].iov_len, wtype, seg); if (nsegs < 0) - return ERR_PTR(nsegs); + return nsegs; - *iptr++ = xdr_one; /* Write list present */ - segcount = iptr++; /* save location of segment count */ + if (encode_item_present(xdr) < 0) + return -EMSGSIZE; + segcount = xdr_reserve_space(xdr, sizeof(*segcount)); + if (unlikely(!segcount)) + return -EMSGSIZE; + /* Actual value encoded below */ nchunks = 0; do { - n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, - true, &mw); - if (n < 0) - return ERR_PTR(n); + seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, + true, &mw); + if (IS_ERR(seg)) + return PTR_ERR(seg); rpcrdma_push_mw(mw, &req->rl_registered); - iptr = xdr_encode_rdma_segment(iptr, mw); + if (encode_rdma_segment(xdr, mw) < 0) + return -EMSGSIZE; dprintk("RPC: %5u %s: %u@0x016%llx:0x%08x (%s)\n", rqst->rq_task->tk_pid, __func__, mw->mw_length, (unsigned long long)mw->mw_offset, - mw->mw_handle, n < nsegs ? "more" : "last"); + mw->mw_handle, mw->mw_nents < nsegs ? "more" : "last"); r_xprt->rx_stats.write_chunk_count++; r_xprt->rx_stats.total_rdma_request += seg->mr_len; nchunks++; - seg += n; - nsegs -= n; + nsegs -= mw->mw_nents; } while (nsegs); /* Update count of segments in this Write chunk */ *segcount = cpu_to_be32(nchunks); - /* Finish Write list */ - *iptr++ = xdr_zero; /* Next item not present */ - return iptr; + return 0; } -/* XDR-encode the Reply chunk. Supports encoding an array of plain - * segments that belong to a single write (reply) chunk. +/* Register and XDR encode the Reply chunk. Supports encoding an array + * of plain segments that belong to a single write (reply) chunk. * * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): * @@ -423,58 +458,57 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, * N elements: * 1 - N - HLOO - HLOO - ... - HLOO * - * Returns a pointer to the XDR word in the RDMA header following - * the end of the Reply chunk, or an error pointer. + * Returns zero on success, or a negative errno if a failure occurred. + * @xdr is advanced to the next position in the stream. */ -static __be32 * -rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, - struct rpcrdma_req *req, struct rpc_rqst *rqst, - __be32 *iptr, enum rpcrdma_chunktype wtype) +static noinline int +rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, + struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype) { + struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; struct rpcrdma_mw *mw; - int n, nsegs, nchunks; + int nsegs, nchunks; __be32 *segcount; - if (wtype != rpcrdma_replych) { - *iptr++ = xdr_zero; /* no Reply chunk present */ - return iptr; - } - seg = req->rl_segments; nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg); if (nsegs < 0) - return ERR_PTR(nsegs); + return nsegs; - *iptr++ = xdr_one; /* Reply chunk present */ - segcount = iptr++; /* save location of segment count */ + if (encode_item_present(xdr) < 0) + return -EMSGSIZE; + segcount = xdr_reserve_space(xdr, sizeof(*segcount)); + if (unlikely(!segcount)) + return -EMSGSIZE; + /* Actual value encoded below */ nchunks = 0; do { - n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, - true, &mw); - if (n < 0) - return ERR_PTR(n); + seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, + true, &mw); + if (IS_ERR(seg)) + return PTR_ERR(seg); rpcrdma_push_mw(mw, &req->rl_registered); - iptr = xdr_encode_rdma_segment(iptr, mw); + if (encode_rdma_segment(xdr, mw) < 0) + return -EMSGSIZE; dprintk("RPC: %5u %s: %u@0x%016llx:0x%08x (%s)\n", rqst->rq_task->tk_pid, __func__, mw->mw_length, (unsigned long long)mw->mw_offset, - mw->mw_handle, n < nsegs ? "more" : "last"); + mw->mw_handle, mw->mw_nents < nsegs ? "more" : "last"); r_xprt->rx_stats.reply_chunk_count++; r_xprt->rx_stats.total_rdma_request += seg->mr_len; nchunks++; - seg += n; - nsegs -= n; + nsegs -= mw->mw_nents; } while (nsegs); /* Update count of segments in the Reply chunk */ *segcount = cpu_to_be32(nchunks); - return iptr; + return 0; } /* Prepare the RPC |