summaryrefslogtreecommitdiffstats
path: root/net/sunrpc
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2017-11-17 14:18:00 -0800
committerLinus Torvalds <torvalds@linux-foundation.org>2017-11-17 14:18:00 -0800
commitc3e9c04b89059a4c93c792da883ca284de182da5 (patch)
tree8cb58f19e0329f040e6c5bd2269572d8bbe58c16 /net/sunrpc
parente0bcb42e602816415f6fe07313b6fc84932244b7 (diff)
parentfcfa447062b2061e11f68b846d61cbfe60d0d604 (diff)
Merge tag 'nfs-for-4.15-1' of git://git.linux-nfs.org/projects/anna/linux-nfs
Pull NFS client updates from Anna Schumaker: "Stable bugfixes: - Revalidate "." and ".." correctly on open - Avoid RCU usage in tracepoints - Fix ugly referral attributes - Fix a typo in nomigration mount option - Revert "NFS: Move the flock open mode check into nfs_flock()" Features: - Implement a stronger send queue accounting system for NFS over RDMA - Switch some atomics to the new refcount_t type Other bugfixes and cleanups: - Clean up access mode bits - Remove special-case revalidations in nfs_opendir() - Improve invalidating NFS over RDMA memory for async operations that time out - Handle NFS over RDMA replies with a worqueue - Handle NFS over RDMA sends with a workqueue - Fix up replaying interrupted requests - Remove dead NFS over RDMA definitions - Update NFS over RDMA copyright information - Be more consistent with bool initialization and comparisons - Mark expected switch fall throughs - Various sunrpc tracepoint cleanups - Fix various OPEN races - Fix a typo in nfs_rename() - Use common error handling code in nfs_lock_and_join_request() - Check that some structures are properly cleaned up during net_exit() - Remove net pointer from dprintk()s" * tag 'nfs-for-4.15-1' of git://git.linux-nfs.org/projects/anna/linux-nfs: (62 commits) NFS: Revert "NFS: Move the flock open mode check into nfs_flock()" NFS: Fix typo in nomigration mount option nfs: Fix ugly referral attributes NFS: super: mark expected switch fall-throughs sunrpc: remove net pointer from messages nfs: remove net pointer from messages sunrpc: exit_net cleanup check added nfs client: exit_net cleanup check added nfs/write: Use common error handling code in nfs_lock_and_join_requests() NFSv4: Replace closed stateids with the "invalid special stateid" NFSv4: nfs_set_open_stateid must not trigger state recovery for closed state NFSv4: Check the open stateid when searching for expired state NFSv4: Clean up nfs4_delegreturn_done NFSv4: cleanup nfs4_close_done NFSv4: Retry NFS4ERR_OLD_STATEID errors in layoutreturn pNFS: Retry NFS4ERR_OLD_STATEID errors in layoutreturn-on-close NFSv4: Don't try to CLOSE if the stateid 'other' field has changed NFSv4: Retry CLOSE and DELEGRETURN on NFS4ERR_OLD_STATEID. NFS: Fix a typo in nfs_rename() NFSv4: Fix open create exclusive when the server reboots ...
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/clnt.c14
-rw-r--r--net/sunrpc/rpc_pipe.c8
-rw-r--r--net/sunrpc/rpcb_clnt.c6
-rw-r--r--net/sunrpc/sched.c3
-rw-r--r--net/sunrpc/sunrpc_syms.c3
-rw-r--r--net/sunrpc/xprt.c1
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c6
-rw-r--r--net/sunrpc/xprtrdma/fmr_ops.c19
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c27
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c363
-rw-r--r--net/sunrpc/xprtrdma/transport.c19
-rw-r--r--net/sunrpc/xprtrdma/verbs.c236
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h119
-rw-r--r--net/sunrpc/xprtsock.c4
14 files changed, 531 insertions, 297 deletions
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 2ad827db2704..a801da812f86 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1491,7 +1491,6 @@ rpc_restart_call(struct rpc_task *task)
}
EXPORT_SYMBOL_GPL(rpc_restart_call);
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
const char
*rpc_proc_name(const struct rpc_task *task)
{
@@ -1505,7 +1504,6 @@ const char
} else
return "no proc";
}
-#endif
/*
* 0. Initial state
@@ -1519,6 +1517,7 @@ call_start(struct rpc_task *task)
struct rpc_clnt *clnt = task->tk_client;
int idx = task->tk_msg.rpc_proc->p_statidx;
+ trace_rpc_request(task);
dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
clnt->cl_program->name, clnt->cl_vers,
rpc_proc_name(task),
@@ -1586,6 +1585,7 @@ call_reserveresult(struct rpc_task *task)
switch (status) {
case -ENOMEM:
rpc_delay(task, HZ >> 2);
+ /* fall through */
case -EAGAIN: /* woken up; retry */
task->tk_action = call_retry_reserve;
return;
@@ -1647,10 +1647,13 @@ call_refreshresult(struct rpc_task *task)
/* Use rate-limiting and a max number of retries if refresh
* had status 0 but failed to update the cred.
*/
+ /* fall through */
case -ETIMEDOUT:
rpc_delay(task, 3*HZ);
+ /* fall through */
case -EAGAIN:
status = -EACCES;
+ /* fall through */
case -EKEYEXPIRED:
if (!task->tk_cred_retry)
break;
@@ -1911,6 +1914,7 @@ call_connect_status(struct rpc_task *task)
task->tk_action = call_bind;
return;
}
+ /* fall through */
case -ECONNRESET:
case -ECONNABORTED:
case -ENETUNREACH:
@@ -1924,6 +1928,7 @@ call_connect_status(struct rpc_task *task)
break;
/* retry with existing socket, after a delay */
rpc_delay(task, 3*HZ);
+ /* fall through */
case -EAGAIN:
/* Check for timeouts before looping back to call_bind */
case -ETIMEDOUT:
@@ -2025,6 +2030,7 @@ call_transmit_status(struct rpc_task *task)
rpc_exit(task, task->tk_status);
break;
}
+ /* fall through */
case -ECONNRESET:
case -ECONNABORTED:
case -EADDRINUSE:
@@ -2145,6 +2151,7 @@ call_status(struct rpc_task *task)
* were a timeout.
*/
rpc_delay(task, 3*HZ);
+ /* fall through */
case -ETIMEDOUT:
task->tk_action = call_timeout;
break;
@@ -2152,14 +2159,17 @@ call_status(struct rpc_task *task)
case -ECONNRESET:
case -ECONNABORTED:
rpc_force_rebind(clnt);
+ /* fall through */
case -EADDRINUSE:
rpc_delay(task, 3*HZ);
+ /* fall through */
case -EPIPE:
case -ENOTCONN:
task->tk_action = call_bind;
break;
case -ENOBUFS:
rpc_delay(task, HZ>>2);
+ /* fall through */
case -EAGAIN:
task->tk_action = call_transmit;
break;
diff --git a/net/sunrpc/rpc_pipe.c b/net/sunrpc/rpc_pipe.c
index 61a504fb1ae2..7803f3b6aa53 100644
--- a/net/sunrpc/rpc_pipe.c
+++ b/net/sunrpc/rpc_pipe.c
@@ -1410,8 +1410,8 @@ rpc_fill_super(struct super_block *sb, void *data, int silent)
return PTR_ERR(gssd_dentry);
}
- dprintk("RPC: sending pipefs MOUNT notification for net %p%s\n",
- net, NET_NAME(net));
+ dprintk("RPC: sending pipefs MOUNT notification for net %x%s\n",
+ net->ns.inum, NET_NAME(net));
mutex_lock(&sn->pipefs_sb_lock);
sn->pipefs_sb = sb;
err = blocking_notifier_call_chain(&rpc_pipefs_notifier_list,
@@ -1462,8 +1462,8 @@ static void rpc_kill_sb(struct super_block *sb)
goto out;
}
sn->pipefs_sb = NULL;
- dprintk("RPC: sending pipefs UMOUNT notification for net %p%s\n",
- net, NET_NAME(net));
+ dprintk("RPC: sending pipefs UMOUNT notification for net %x%s\n",
+ net->ns.inum, NET_NAME(net));
blocking_notifier_call_chain(&rpc_pipefs_notifier_list,
RPC_PIPEFS_UMOUNT,
sb);
diff --git a/net/sunrpc/rpcb_clnt.c b/net/sunrpc/rpcb_clnt.c
index ea0676f199c8..c526f8fb37c9 100644
--- a/net/sunrpc/rpcb_clnt.c
+++ b/net/sunrpc/rpcb_clnt.c
@@ -216,9 +216,9 @@ static void rpcb_set_local(struct net *net, struct rpc_clnt *clnt,
smp_wmb();
sn->rpcb_users = 1;
dprintk("RPC: created new rpcb local clients (rpcb_local_clnt: "
- "%p, rpcb_local_clnt4: %p) for net %p%s\n",
- sn->rpcb_local_clnt, sn->rpcb_local_clnt4,
- net, (net == &init_net) ? " (init_net)" : "");
+ "%p, rpcb_local_clnt4: %p) for net %x%s\n",
+ sn->rpcb_local_clnt, sn->rpcb_local_clnt4,
+ net->ns.inum, (net == &init_net) ? " (init_net)" : "");
}
/*
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 5dea47eb31bb..b1b49edd7c4d 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -274,10 +274,9 @@ static inline void rpc_task_set_debuginfo(struct rpc_task *task)
static void rpc_set_active(struct rpc_task *task)
{
- trace_rpc_task_begin(task->tk_client, task, NULL);
-
rpc_task_set_debuginfo(task);
set_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
+ trace_rpc_task_begin(task->tk_client, task, NULL);
}
/*
diff --git a/net/sunrpc/sunrpc_syms.c b/net/sunrpc/sunrpc_syms.c
index c73de181467a..56f9eff74150 100644
--- a/net/sunrpc/sunrpc_syms.c
+++ b/net/sunrpc/sunrpc_syms.c
@@ -65,10 +65,13 @@ err_proc:
static __net_exit void sunrpc_exit_net(struct net *net)
{
+ struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
+
rpc_pipefs_exit_net(net);
unix_gid_cache_destroy(net);
ip_map_cache_destroy(net);
rpc_proc_exit(net);
+ WARN_ON_ONCE(!list_empty(&sn->all_clients));
}
static struct pernet_operations sunrpc_net_ops = {
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 6160d17a31c4..333b9d697ae5 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1139,6 +1139,7 @@ void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
case -EAGAIN:
xprt_add_backlog(xprt, task);
dprintk("RPC: waiting for request slot\n");
+ /* fall through */
default:
task->tk_status = -EAGAIN;
}
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 823a781ec89c..8b818bb3518a 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -43,7 +43,7 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
req = rpcrdma_create_req(r_xprt);
if (IS_ERR(req))
return PTR_ERR(req);
- req->rl_backchannel = true;
+ __set_bit(RPCRDMA_REQ_F_BACKCHANNEL, &req->rl_flags);
rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE,
DMA_TO_DEVICE, GFP_KERNEL);
@@ -223,8 +223,8 @@ int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
*p++ = xdr_zero;
*p = xdr_zero;
- if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, RPCRDMA_HDRLEN_MIN,
- &rqst->rq_snd_buf, rpcrdma_noch))
+ if (rpcrdma_prepare_send_sges(r_xprt, req, RPCRDMA_HDRLEN_MIN,
+ &rqst->rq_snd_buf, rpcrdma_noch))
return -EIO;
return 0;
}
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index fa759dd2b0f3..29fc84c7ff98 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -306,28 +306,9 @@ out_reset:
}
}
-/* Use a slow, safe mechanism to invalidate all memory regions
- * that were registered for "req".
- */
-static void
-fmr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
- bool sync)
-{
- struct rpcrdma_mw *mw;
-
- while (!list_empty(&req->rl_registered)) {
- mw = rpcrdma_pop_mw(&req->rl_registered);
- if (sync)
- fmr_op_recover_mr(mw);
- else
- rpcrdma_defer_mr_recovery(mw);
- }
-}
-
const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
.ro_map = fmr_op_map,
.ro_unmap_sync = fmr_op_unmap_sync,
- .ro_unmap_safe = fmr_op_unmap_safe,
.ro_recover_mr = fmr_op_recover_mr,
.ro_open = fmr_op_open,
.ro_maxpages = fmr_op_maxpages,
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 35d7517ef0e6..773e66e10a15 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -420,7 +420,6 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
IB_ACCESS_REMOTE_READ;
- rpcrdma_set_signaled(&r_xprt->rx_ep, &reg_wr->wr);
rc = ib_post_send(ia->ri_id->qp, &reg_wr->wr, &bad_wr);
if (rc)
goto out_senderr;
@@ -508,12 +507,6 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mws)
f->fr_cqe.done = frwr_wc_localinv_wake;
reinit_completion(&f->fr_linv_done);
- /* Initialize CQ count, since there is always a signaled
- * WR being posted here. The new cqcount depends on how
- * many SQEs are about to be consumed.
- */
- rpcrdma_init_cqcount(&r_xprt->rx_ep, count);
-
/* Transport disconnect drains the receive CQ before it
* replaces the QP. The RPC reply handler won't call us
* unless ri_id->qp is a valid pointer.
@@ -546,7 +539,6 @@ reset_mrs:
/* Find and reset the MRs in the LOCAL_INV WRs that did not
* get posted.
*/
- rpcrdma_init_cqcount(&r_xprt->rx_ep, -count);
while (bad_wr) {
f = container_of(bad_wr, struct rpcrdma_frmr,
fr_invwr);
@@ -559,28 +551,9 @@ reset_mrs:
goto unmap;
}
-/* Use a slow, safe mechanism to invalidate all memory regions
- * that were registered for "req".
- */
-static void
-frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
- bool sync)
-{
- struct rpcrdma_mw *mw;
-
- while (!list_empty(&req->rl_registered)) {
- mw = rpcrdma_pop_mw(&req->rl_registered);
- if (sync)
- frwr_op_recover_mr(mw);
- else
- rpcrdma_defer_mr_recovery(mw);
- }
-}
-
const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
.ro_map = frwr_op_map,
.ro_unmap_sync = frwr_op_unmap_sync,
- .ro_unmap_safe = frwr_op_unmap_safe,
.ro_recover_mr = frwr_op_recover_mr,
.ro_open = frwr_op_open,
.ro_maxpages = frwr_op_maxpages,
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index f1889f4d4803..ed34dc0f144c 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -1,4 +1,5 @@
/*
+ * Copyright (c) 2014-2017 Oracle. All rights reserved.
* Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
*
* This software is available to you under a choice of one of two
@@ -75,11 +76,11 @@ static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
/* Maximum Read list size */
maxsegs += 2; /* segment for head and tail buffers */
- size = maxsegs * sizeof(struct rpcrdma_read_chunk);
+ size = maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32);
/* Minimal Read chunk size */
size += sizeof(__be32); /* segment count */
- size += sizeof(struct rpcrdma_segment);
+ size += rpcrdma_segment_maxsz * sizeof(__be32);
size += sizeof(__be32); /* list discriminator */
dprintk("RPC: %s: max call header size = %u\n",
@@ -102,7 +103,7 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
/* Maximum Write list size */
maxsegs += 2; /* segment for head and tail buffers */
size = sizeof(__be32); /* segment count */
- size += maxsegs * sizeof(struct rpcrdma_segment);
+ size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32);
size += sizeof(__be32); /* list discriminator */
dprintk("RPC: %s: max reply header size = %u\n",
@@ -511,27 +512,60 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
return 0;
}
-/* Prepare the RPC-over-RDMA header SGE.
+/**
+ * rpcrdma_unmap_sendctx - DMA-unmap Send buffers
+ * @sc: sendctx containing SGEs to unmap
+ *
+ */
+void
+rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc)
+{
+ struct rpcrdma_ia *ia = &sc->sc_xprt->rx_ia;
+ struct ib_sge *sge;
+ unsigned int count;
+
+ dprintk("RPC: %s: unmapping %u sges for sc=%p\n",
+ __func__, sc->sc_unmap_count, sc);
+
+ /* The first two SGEs contain the transport header and
+ * the inline buffer. These are always left mapped so
+ * they can be cheaply re-used.
+ */
+ sge = &sc->sc_sges[2];
+ for (count = sc->sc_unmap_count; count; ++sge, --count)
+ ib_dma_unmap_page(ia->ri_device,
+ sge->addr, sge->length, DMA_TO_DEVICE);
+
+ if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &sc->sc_req->rl_flags)) {
+ smp_mb__after_atomic();
+ wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
+ }
+}
+
+/* Prepare an SGE for the RPC-over-RDMA transport header.
*/
static bool
rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
u32 len)
{
+ struct rpcrdma_sendctx *sc = req->rl_sendctx;
struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
- struct ib_sge *sge = &req->rl_send_sge[0];
+ struct ib_sge *sge = sc->sc_sges;
- if (unlikely(!rpcrdma_regbuf_is_mapped(rb))) {
- if (!__rpcrdma_dma_map_regbuf(ia, rb))
- return false;
- sge->addr = rdmab_addr(rb);
- sge->lkey = rdmab_lkey(rb);
- }
+ if (!rpcrdma_dma_map_regbuf(ia, rb))
+ goto out_regbuf;
+ sge->addr = rdmab_addr(rb);
sge->length = len;
+ sge->lkey = rdmab_lkey(rb);
ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr,
sge->length, DMA_TO_DEVICE);
- req->rl_send_wr.num_sge++;
+ sc->sc_wr.num_sge++;
return true;
+
+out_regbuf:
+ pr_err("rpcrdma: failed to DMA map a Send buffer\n");
+ return false;
}
/* Prepare the Send SGEs. The head and tail iovec, and each entry
@@ -541,10 +575,11 @@ static bool
rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
{
+ struct rpcrdma_sendctx *sc = req->rl_sendctx;
unsigned int sge_no, page_base, len, remaining;
struct rpcrdma_regbuf *rb = req->rl_sendbuf;
struct ib_device *device = ia->ri_device;
- struct ib_sge *sge = req->rl_send_sge;
+ struct ib_sge *sge = sc->sc_sges;
u32 lkey = ia->ri_pd->local_dma_lkey;
struct page *page, **ppages;
@@ -552,7 +587,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
* DMA-mapped. Sync the content that has changed.
*/
if (!rpcrdma_dma_map_regbuf(ia, rb))
- return false;
+ goto out_regbuf;
sge_no = 1;
sge[sge_no].addr = rdmab_addr(rb);
sge[sge_no].length = xdr->head[0].iov_len;
@@ -607,7 +642,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
sge[sge_no].length = len;
sge[sge_no].lkey = lkey;
- req->rl_mapped_sges++;
+ sc->sc_unmap_count++;
ppages++;
remaining -= len;
page_base = 0;
@@ -633,56 +668,61 @@ map_tail:
goto out_mapping_err;
sge[sge_no].length = len;
sge[sge_no].lkey = lkey;
- req->rl_mapped_sges++;
+ sc->sc_unmap_count++;
}
out:
- req->rl_send_wr.num_sge = sge_no + 1;
+ sc->sc_wr.num_sge += sge_no;
+ if (sc->sc_unmap_count)
+ __set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
return true;
+out_regbuf:
+ pr_err("rpcrdma: failed to DMA map a Send buffer\n");
+ return false;
+
out_mapping_overflow:
+ rpcrdma_unmap_sendctx(sc);
pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
return false;
out_mapping_err:
+ rpcrdma_unmap_sendctx(sc);
pr_err("rpcrdma: Send mapping error\n");
return false;
}
-bool
-rpcrdma_prepare_send_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
- u32 hdrlen, struct xdr_buf *xdr,
- enum rpcrdma_chunktype rtype)
+/**
+ * rpcrdma_prepare_send_sges - Construct SGEs for a Send WR
+ * @r_xprt: controlling transport
+ * @req: context of RPC Call being marshalled
+ * @hdrlen: size of transport header, in bytes
+ * @xdr: xdr_buf containing RPC Call
+ * @rtype: chunk type being encoded
+ *
+ * Returns 0 on success; otherwise a negative errno is returned.
+ */
+int
+rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_req *req, u32 hdrlen,
+ struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
{
- req->rl_send_wr.num_sge = 0;
- req->rl_mapped_sges = 0;
-
- if (!rpcrdma_prepare_hdr_sge(ia, req, hdrlen))
- goto out_map;
+ req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf);
+ if (!req->rl_sendctx)
+ return -ENOBUFS;
+ req->rl_sendctx->sc_wr.num_sge = 0;
+ req->rl_sendctx->sc_unmap_count = 0;
+ req->rl_sendctx->sc_req = req;
+ __clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
+
+ if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen))
+ return -EIO;
if (rtype != rpcrdma_areadch)
- if (!rpcrdma_prepare_msg_sges(ia, req, xdr, rtype))
- goto out_map;
-
- return true;
-
-out_map:
- pr_err("rpcrdma: failed to DMA map a Send buffer\n");
- return false;
-}
-
-void
-rpcrdma_unmap_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
-{
- struct ib_device *device = ia->ri_device;
- struct ib_sge *sge;
- int count;
+ if (!rpcrdma_prepare_msg_sges(&r_xprt->rx_ia, req, xdr, rtype))
+ return -EIO;
- sge = &req->rl_send_sge[2];
- for (count = req->rl_mapped_sges; count--; sge++)
- ib_dma_unmap_page(device, sge->addr, sge->length,
- DMA_TO_DEVICE);
- req->rl_mapped_sges = 0;
+ return 0;
}
/**
@@ -833,12 +873,10 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
transfertypes[rtype], transfertypes[wtype],
xdr_stream_pos(xdr));
- if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req,
- xdr_stream_pos(xdr),
- &rqst->rq_snd_buf, rtype)) {
- ret = -EIO;
+ ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr),
+ &rqst->rq_snd_buf, rtype);
+ if (ret)
goto out_err;
- }
return 0;
out_err:
@@ -970,14 +1008,13 @@ rpcrdma_mark_remote_invalidation(struct list_head *mws,
* straightforward to check the RPC header's direction field.
*/
static bool
-rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
- __be32 xid, __be32 proc)
+rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
{
struct xdr_stream *xdr = &rep->rr_stream;
__be32 *p;
- if (proc != rdma_msg)
+ if (rep->rr_proc != rdma_msg)
return false;
/* Peek at stream contents without advancing. */
@@ -992,7 +1029,7 @@ rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
return false;
/* RPC header */
- if (*p++ != xid)
+ if (*p++ != rep->rr_xid)
return false;
if (*p != cpu_to_be32(RPC_CALL))
return false;
@@ -1212,105 +1249,170 @@ rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
return -EREMOTEIO;
}
+/* Perform XID lookup, reconstruction of the RPC reply, and
+ * RPC completion while holding the transport lock to ensure
+ * the rep, rqst, and rq_task pointers remain stable.
+ */
+void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
+{
+ struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+ struct rpc_rqst *rqst = rep->rr_rqst;
+ unsigned long cwnd;
+ int status;
+
+ xprt->reestablish_timeout = 0;
+
+ switch (rep->rr_proc) {
+ case rdma_msg:
+ status = rpcrdma_decode_msg(r_xprt, rep, rqst);
+ break;
+ case rdma_nomsg:
+ status = rpcrdma_decode_nomsg(r_xprt, rep);
+ break;
+ case rdma_error:
+ status = rpcrdma_decode_error(r_xprt, rep, rqst);
+ break;
+ default:
+ status = -EIO;
+ }
+ if (status < 0)
+ goto out_badheader;
+
+out:
+ spin_lock(&xprt->recv_lock);
+ cwnd = xprt->cwnd;
+ xprt->cwnd = r_xprt->rx_buf.rb_credits << RPC_CWNDSHIFT;
+ if (xprt->cwnd > cwnd)
+ xprt_release_rqst_cong(rqst->rq_task);
+
+ xprt_complete_rqst(rqst->rq_task, status);
+ xprt_unpin_rqst(rqst);
+ spin_unlock(&xprt->recv_lock);
+ return;
+
+/* If the incoming reply terminated a pending RPC, the next
+ * RPC call will post a replacement receive buffer as it is
+ * being marshaled.
+ */
+out_badheader:
+ dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
+ rqst->rq_task->tk_pid, __func__, be32_to_cpu(rep->rr_proc));
+ r_xprt->rx_stats.bad_reply_count++;
+ status = -EIO;
+ goto out;
+}
+
+void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
+{
+ /* Invalidate and unmap the data payloads before waking
+ * the waiting application. This guarantees the memory
+ * regions are properly fenced from the server before the
+ * application accesses the data. It also ensures proper
+ * send flow control: waking the next RPC waits until this
+ * RPC has relinquished all its Send Queue entries.
+ */
+ if (!list_empty(&req->rl_registered))
+ r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
+ &req->rl_registered);
+
+ /* Ensure that any DMA mapped pages associated with
+ * the Send of the RPC Call have been unmapped before
+ * allowing the RPC to complete. This protects argument
+ * memory not controlled by the RPC client from being
+ * re-used before we're done with it.
+ */
+ if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
+ r_xprt->rx_stats.reply_waits_for_send++;
+ out_of_line_wait_on_bit(&req->rl_flags,
+ RPCRDMA_REQ_F_TX_RESOURCES,
+ bit_wait,
+ TASK_UNINTERRUPTIBLE);
+ }
+}
+
+/* Reply handling runs in the poll worker thread. Anything that
+ * might wait is deferred to a separate workqueue.
+ */
+void rpcrdma_deferred_completion(struct work_struct *work)
+{
+ struct rpcrdma_rep *rep =
+ container_of(work, struct rpcrdma_rep, rr_work);
+ struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
+
+ rpcrdma_mark_remote_invalidation(&req->rl_registered, rep);
+ rpcrdma_release_rqst(rep->rr_rxprt, req);
+ rpcrdma_complete_rqst(rep);
+}
+
/* Process received RPC/RDMA messages.
*
* Errors must result in the RPC task either being awakened, or
* allowed to timeout, to discover the errors at that time.
*/
-void
-rpcrdma_reply_handler(struct work_struct *work)
+void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
{
- struct rpcrdma_rep *rep =
- container_of(work, struct rpcrdma_rep, rr_work);
struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
- struct xdr_stream *xdr = &rep->rr_stream;
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_req *req;
struct rpc_rqst *rqst;
- __be32 *p, xid, vers, proc;
- unsigned long cwnd;
- int status;
+ u32 credits;
+ __be32 *p;
dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
if (rep->rr_hdrbuf.head[0].iov_len == 0)
goto out_badstatus;
- xdr_init_decode(xdr, &rep->rr_hdrbuf,
+ xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf,
rep->rr_hdrbuf.head[0].iov_base);
/* Fixed transport header fields */
- p = xdr_inline_decode(xdr, 4 * sizeof(*p));
+ p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p));
if (unlikely(!p))
goto out_shortreply;
- xid = *p++;
- vers = *p++;
- p++; /* credits */
- proc = *p++;
+ rep->rr_xid = *p++;
+ rep->rr_vers = *p++;
+ credits = be32_to_cpu(*p++);
+ rep->rr_proc = *p++;
+
+ if (rep->rr_vers != rpcrdma_version)
+ goto out_badversion;
- if (rpcrdma_is_bcall(r_xprt, rep, xid, proc))
+ if (rpcrdma_is_bcall(r_xprt, rep))
return;
/* Match incoming rpcrdma_rep to an rpcrdma_req to
* get context for handling any incoming chunks.
*/
spin_lock(&xprt->recv_lock);
- rqst = xprt_lookup_rqst(xprt, xid);
+ rqst = xprt_lookup_rqst(xprt, rep->rr_xid);
if (!rqst)
goto out_norqst;
xprt_pin_rqst(rqst);
+
+ if (credits == 0)
+ credits = 1; /* don't deadlock */
+ else if (credits > buf->rb_max_requests)
+ credits = buf->rb_max_requests;
+ buf->rb_credits = credits;
+
spin_unlock(&xprt->recv_lock);
+
req = rpcr_to_rdmar(rqst);
req->rl_reply = rep;
+ rep->rr_rqst = rqst;
+ clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
- __func__, rep, req, be32_to_cpu(xid));
-
- /* Invalidate and unmap the data payloads before waking the
- * waiting application. This guarantees the memory regions
- * are properly fenced from the server before the application
- * accesses the data. It also ensures proper send flow control:
- * waking the next RPC waits until this RPC has relinquished
- * all its Send Queue entries.
- */
- if (!list_empty(&req->rl_registered)) {
- rpcrdma_mark_remote_invalidation(&req->rl_registered, rep);
- r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
- &req->rl_registered);
- }
-
- xprt->reestablish_timeout = 0;
- if (vers != rpcrdma_version)
- goto out_badversion;
+ __func__, rep, req, be32_to_cpu(rep->rr_xid));
- switch (proc) {
- case rdma_msg:
- status = rpcrdma_decode_msg(r_xprt, rep, rqst);
- break;
- case rdma_nomsg:
- status = rpcrdma_decode_nomsg(r_xprt, rep);
- break;
- case rdma_error:
- status = rpcrdma_decode_error(r_xprt, rep, rqst);
- break;
- default:
- status = -EIO;
- }
- if (status < 0)
- goto out_badheader;
-
-out:
- spin_lock(&xprt->recv_lock);
- cwnd = xprt->cwnd;
- xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
- if (xprt->cwnd > cwnd)
- xprt_release_rqst_cong(rqst->rq_task);
-
- xprt_complete_rqst(rqst->rq_task, status);
- xprt_unpin_rqst(rqst);
- spin_unlock(&xprt->recv_lock);
- dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
- __func__, xprt, rqst, status);
+ if (list_empty(&req->rl_registered) &&
+ !test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags))
+ rpcrdma_complete_rqst(rep);
+ else
+ queue_work(rpcrdma_receive_wq, &rep->rr_work);
return;
out_badstatus:
@@ -1321,37 +1423,22 @@ out_badstatus:
}
return;
-/* If the incoming reply terminated a pending RPC, the next
- * RPC call will post a replacement receive buffer as it is
- * being marshaled.
- */
out_badversion:
dprintk("RPC: %s: invalid version %d\n",
- __func__, be32_to_cpu(vers));
- status = -EIO;
- r_xprt->rx_stats.bad_reply_count++;
- goto out;
-
-out_badheader:
- dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
- rqst->rq_task->tk_pid, __func__, be32_to_cpu(proc));
- r_xprt->rx_stats.bad_reply_count++;
- status = -EIO;
- goto out;
+ __func__, be32_to_cpu(rep->rr_vers));
+ goto repost;
-/* The req was still available, but by the time the recv_lock
- * was acquired, the rqst and task had been released. Thus the RPC
- * has already been terminated.
+/* The RPC transaction has already been terminated, or the header
+ * is corrupt.
*/
out_norqst:
spin_unlock(&xprt->recv_lock);
dprintk("RPC: %s: no match for incoming xid 0x%08x\n",
- __func__, be32_to_cpu(xid));
+ __func__, be32_to_cpu(rep->rr_xid));
goto repost;
out_shortreply:
dprintk("RPC: %s: short/invalid reply\n", __func__);
- goto repost;
/* If no pending RPC transaction was matched, post a replacement
* receive buffer before returning.
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index c84e2b644e13..646c24494ea7 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -1,4 +1,5 @@
/*
+ * Copyright (c) 2014-2017 Oracle. All rights reserved.
* Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
*
* This software is available to you under a choice of one of two
@@ -678,16 +679,14 @@ xprt_rdma_free(struct rpc_task *task)
struct rpc_rqst *rqst = task->tk_rqstp;
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- if (req->rl_backchannel)
+ if (test_bit(RPCRDMA_REQ_F_BACKCHANNEL, &req->rl_flags))
return;
dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply);
- if (!list_empty(&req->rl_registered))
- ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task));
- rpcrdma_unmap_sges(ia, req);
+ if (test_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags))
+ rpcrdma_release_rqst(r_xprt, req);
rpcrdma_buffer_put(req);
}
@@ -728,7 +727,8 @@ xprt_rdma_send_request(struct rpc_task *task)
/* On retransmit, remove any previously registered chunks */
if (unlikely(!list_empty(&req->rl_registered)))
- r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
+ r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
+ &req->rl_registered);
rc = rpcrdma_marshal_req(r_xprt, rqst);
if (rc < 0)
@@ -742,6 +742,7 @@ xprt_rdma_send_request(struct rpc_task *task)
goto drop_connection;
req->rl_connect_cookie = xprt->connect_cookie;
+ set_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
goto drop_connection;
@@ -789,11 +790,13 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
r_xprt->rx_stats.failed_marshal_count,
r_xprt->rx_stats.bad_reply_count,
r_xprt->rx_stats.nomsg_call_count);
- seq_printf(seq, "%lu %lu %lu %lu\n",
+ seq_printf(seq, "%lu %lu %lu %lu %lu %lu\n",
r_xprt->rx_stats.mrs_recovered,
r_xprt->rx_stats.mrs_orphaned,
r_xprt->rx_stats.mrs_allocated,
- r_xprt->rx_stats.local_inv_needed);
+ r_xprt->rx_stats.local_inv_needed,
+ r_xprt->rx_stats.empty_sendctx_q,