summaryrefslogtreecommitdiffstats
path: root/net/sunrpc/xprtrdma
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2019-07-18 14:32:33 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2019-07-18 14:32:33 -0700
commit6860c981b9672324cb53b883cfda8d2ea1445ff1 (patch)
treed97a47712479bc2f886012d7f2da8b00ac504f3a /net/sunrpc/xprtrdma
parent0570bc8b7c9b41deba6f61ac218922e7168ad648 (diff)
parentd5b9216fd5114be4ed98ca9c1ecc5f164cd8cf5e (diff)
Merge tag 'nfs-for-5.3-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs
Pull NFS client updates from Trond Myklebust: "Highlights include: Stable fixes: - SUNRPC: Ensure bvecs are re-synced when we re-encode the RPC request - Fix an Oops in ff_layout_track_ds_error due to a PTR_ERR() dereference - Revert buggy NFS readdirplus optimisation - NFSv4: Handle the special Linux file open access mode - pnfs: Fix a problem where we gratuitously start doing I/O through the MDS Features: - Allow NFS client to set up multiple TCP connections to the server using a new 'nconnect=X' mount option. Queue length is used to balance load. - Enhance statistics reporting to report on all transports when using multiple connections. - Speed up SUNRPC by removing bh-safe spinlocks - Add a mechanism to allow NFSv4 to request that containers set a unique per-host identifier for when the hostname is not set. - Ensure NFSv4 updates the lease_time after a clientid update Bugfixes and cleanup: - Fix use-after-free in rpcrdma_post_recvs - Fix a memory leak when nfs_match_client() is interrupted - Fix buggy file access checking in NFSv4 open for execute - disable unsupported client side deduplication - Fix spurious client disconnections - Fix occasional RDMA transport deadlock - Various RDMA cleanups - Various tracepoint fixes - Fix the TCP callback channel to guarantee the server can actually send the number of callback requests that was negotiated at mount time" * tag 'nfs-for-5.3-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs: (68 commits) pnfs/flexfiles: Add tracepoints for detecting pnfs fallback to MDS pnfs: Fix a problem where we gratuitously start doing I/O through the MDS SUNRPC: Optimise transport balancing code SUNRPC: Ensure the bvecs are reset when we re-encode the RPC request pnfs/flexfiles: Fix PTR_ERR() dereferences in ff_layout_track_ds_error NFSv4: Don't use the zero stateid with layoutget SUNRPC: Fix up backchannel slot table accounting SUNRPC: Fix initialisation of struct rpc_xprt_switch SUNRPC: Skip zero-refcount transports SUNRPC: Replace division by multiplication in calculation of queue length NFSv4: Validate the stateid before applying it to state recovery nfs4.0: Refetch lease_time after clientid update nfs4: Rename nfs41_setup_state_renewal nfs4: Make nfs4_proc_get_lease_time available for nfs4.0 nfs: Fix copy-and-paste error in debug message NFS: Replace 16 seq_printf() calls by seq_puts() NFS: Use seq_putc() in nfs_show_stats() Revert "NFS: readdirplus optimization by cache mechanism" (memleak) SUNRPC: Fix transport accounting when caller specifies an rpc_xprt NFS: Record task, client ID, and XID in xdr_status trace points ...
Diffstat (limited to 'net/sunrpc/xprtrdma')
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c7
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c327
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c152
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_backchannel.c4
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c8
-rw-r--r--net/sunrpc/xprtrdma/transport.c84
-rw-r--r--net/sunrpc/xprtrdma/verbs.c115
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h45
8 files changed, 423 insertions, 319 deletions
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index ce986591f213..59e624b1d7a0 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -52,6 +52,13 @@ size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
return maxmsg - RPCRDMA_HDRLEN_MIN;
}
+unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *xprt)
+{
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+
+ return r_xprt->rx_buf.rb_bc_srv_max_requests;
+}
+
static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 794ba4ca0994..0b6dad7580a1 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -144,6 +144,26 @@ frwr_mr_recycle_worker(struct work_struct *work)
frwr_release_mr(mr);
}
+/* frwr_reset - Place MRs back on the free list
+ * @req: request to reset
+ *
+ * Used after a failed marshal. For FRWR, this means the MRs
+ * don't have to be fully released and recreated.
+ *
+ * NB: This is safe only as long as none of @req's MRs are
+ * involved with an ongoing asynchronous FAST_REG or LOCAL_INV
+ * Work Request.
+ */
+void frwr_reset(struct rpcrdma_req *req)
+{
+ while (!list_empty(&req->rl_registered)) {
+ struct rpcrdma_mr *mr;
+
+ mr = rpcrdma_mr_pop(&req->rl_registered);
+ rpcrdma_mr_unmap_and_put(mr);
+ }
+}
+
/**
* frwr_init_mr - Initialize one MR
* @ia: interface adapter
@@ -168,7 +188,6 @@ int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
goto out_list_err;
mr->frwr.fr_mr = frmr;
- mr->frwr.fr_state = FRWR_IS_INVALID;
mr->mr_dir = DMA_NONE;
INIT_LIST_HEAD(&mr->mr_list);
INIT_WORK(&mr->mr_recycle, frwr_mr_recycle_worker);
@@ -298,65 +317,6 @@ size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt)
}
/**
- * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
- * @cq: completion queue (ignored)
- * @wc: completed WR
- *
- */
-static void
-frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
-{
- struct ib_cqe *cqe = wc->wr_cqe;
- struct rpcrdma_frwr *frwr =
- container_of(cqe, struct rpcrdma_frwr, fr_cqe);
-
- /* WARNING: Only wr_cqe and status are reliable at this point */
- if (wc->status != IB_WC_SUCCESS)
- frwr->fr_state = FRWR_FLUSHED_FR;
- trace_xprtrdma_wc_fastreg(wc, frwr);
-}
-
-/**
- * frwr_wc_localinv - Invoked by RDMA provider for a flushed LocalInv WC
- * @cq: completion queue (ignored)
- * @wc: completed WR
- *
- */
-static void
-frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
-{
- struct ib_cqe *cqe = wc->wr_cqe;
- struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr,
- fr_cqe);
-
- /* WARNING: Only wr_cqe and status are reliable at this point */
- if (wc->status != IB_WC_SUCCESS)
- frwr->fr_state = FRWR_FLUSHED_LI;
- trace_xprtrdma_wc_li(wc, frwr);
-}
-
-/**
- * frwr_wc_localinv_wake - Invoked by RDMA provider for a signaled LocalInv WC
- * @cq: completion queue (ignored)
- * @wc: completed WR
- *
- * Awaken anyone waiting for an MR to finish being fenced.
- */
-static void
-frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
-{
- struct ib_cqe *cqe = wc->wr_cqe;
- struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr,
- fr_cqe);
-
- /* WARNING: Only wr_cqe and status are reliable at this point */
- if (wc->status != IB_WC_SUCCESS)
- frwr->fr_state = FRWR_FLUSHED_LI;
- trace_xprtrdma_wc_li_wake(wc, frwr);
- complete(&frwr->fr_linv_done);
-}
-
-/**
* frwr_map - Register a memory region
* @r_xprt: controlling transport
* @seg: memory region co-ordinates
@@ -378,23 +338,15 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
{
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS;
- struct rpcrdma_frwr *frwr;
struct rpcrdma_mr *mr;
struct ib_mr *ibmr;
struct ib_reg_wr *reg_wr;
int i, n;
u8 key;
- mr = NULL;
- do {
- if (mr)
- rpcrdma_mr_recycle(mr);
- mr = rpcrdma_mr_get(r_xprt);
- if (!mr)
- return ERR_PTR(-EAGAIN);
- } while (mr->frwr.fr_state != FRWR_IS_INVALID);
- frwr = &mr->frwr;
- frwr->fr_state = FRWR_IS_VALID;
+ mr = rpcrdma_mr_get(r_xprt);
+ if (!mr)
+ goto out_getmr_err;
if (nsegs > ia->ri_max_frwr_depth)
nsegs = ia->ri_max_frwr_depth;
@@ -423,7 +375,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
if (!mr->mr_nents)
goto out_dmamap_err;
- ibmr = frwr->fr_mr;
+ ibmr = mr->frwr.fr_mr;
n = ib_map_mr_sg(ibmr, mr->mr_sg, mr->mr_nents, NULL, PAGE_SIZE);
if (unlikely(n != mr->mr_nents))
goto out_mapmr_err;
@@ -433,7 +385,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
key = (u8)(ibmr->rkey & 0x000000FF);
ib_update_fast_reg_key(ibmr, ++key);
- reg_wr = &frwr->fr_regwr;
+ reg_wr = &mr->frwr.fr_regwr;
reg_wr->mr = ibmr;
reg_wr->key = ibmr->rkey;
reg_wr->access = writing ?
@@ -448,6 +400,10 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
*out = mr;
return seg;
+out_getmr_err:
+ xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
+ return ERR_PTR(-EAGAIN);
+
out_dmamap_err:
mr->mr_dir = DMA_NONE;
trace_xprtrdma_frwr_sgerr(mr, i);
@@ -461,6 +417,23 @@ out_mapmr_err:
}
/**
+ * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
+ *
+ */
+static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct rpcrdma_frwr *frwr =
+ container_of(cqe, struct rpcrdma_frwr, fr_cqe);
+
+ /* WARNING: Only wr_cqe and status are reliable at this point */
+ trace_xprtrdma_wc_fastreg(wc, frwr);
+ /* The MR will get recycled when the associated req is retransmitted */
+}
+
+/**
* frwr_send - post Send WR containing the RPC Call message
* @ia: interface adapter
* @req: Prepared RPC Call
@@ -512,31 +485,75 @@ void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
if (mr->mr_handle == rep->rr_inv_rkey) {
list_del_init(&mr->mr_list);
trace_xprtrdma_mr_remoteinv(mr);
- mr->frwr.fr_state = FRWR_IS_INVALID;
rpcrdma_mr_unmap_and_put(mr);
break; /* only one invalidated MR per RPC */
}
}
+static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr)
+{
+ if (wc->status != IB_WC_SUCCESS)
+ rpcrdma_mr_recycle(mr);
+ else
+ rpcrdma_mr_unmap_and_put(mr);
+}
+
/**
- * frwr_unmap_sync - invalidate memory regions that were registered for @req
- * @r_xprt: controlling transport
- * @mrs: list of MRs to process
+ * frwr_wc_localinv - Invoked by RDMA provider for a LOCAL_INV WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
+ *
+ */
+static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct rpcrdma_frwr *frwr =
+ container_of(cqe, struct rpcrdma_frwr, fr_cqe);
+ struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
+
+ /* WARNING: Only wr_cqe and status are reliable at this point */
+ trace_xprtrdma_wc_li(wc, frwr);
+ __frwr_release_mr(wc, mr);
+}
+
+/**
+ * frwr_wc_localinv_wake - Invoked by RDMA provider for a LOCAL_INV WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
*
- * Sleeps until it is safe for the host CPU to access the
- * previously mapped memory regions.
+ * Awaken anyone waiting for an MR to finish being fenced.
+ */
+static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct rpcrdma_frwr *frwr =
+ container_of(cqe, struct rpcrdma_frwr, fr_cqe);
+ struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
+
+ /* WARNING: Only wr_cqe and status are reliable at this point */
+ trace_xprtrdma_wc_li_wake(wc, frwr);
+ complete(&frwr->fr_linv_done);
+ __frwr_release_mr(wc, mr);
+}
+
+/**
+ * frwr_unmap_sync - invalidate memory regions that were registered for @req
+ * @r_xprt: controlling transport instance
+ * @req: rpcrdma_req with a non-empty list of MRs to process
*
- * Caller ensures that @mrs is not empty before the call. This
- * function empties the list.
+ * Sleeps until it is safe for the host CPU to access the previously mapped
+ * memory regions. This guarantees that registered MRs are properly fenced
+ * from the server before the RPC consumer accesses the data in them. It
+ * also ensures proper Send flow control: waking the next RPC waits until
+ * this RPC has relinquished all its Send Queue entries.
*/
-void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs)
+void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{
struct ib_send_wr *first, **prev, *last;
const struct ib_send_wr *bad_wr;
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_frwr *frwr;
struct rpcrdma_mr *mr;
- int count, rc;
+ int rc;
/* ORDER: Invalidate all of the MRs first
*
@@ -544,33 +561,32 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs)
* a single ib_post_send() call.
*/
frwr = NULL;
- count = 0;
prev = &first;
- list_for_each_entry(mr, mrs, mr_list) {
- mr->frwr.fr_state = FRWR_IS_INVALID;
+ while (!list_empty(&req->rl_registered)) {
+ mr = rpcrdma_mr_pop(&req->rl_registered);
- frwr = &mr->frwr;
trace_xprtrdma_mr_localinv(mr);
+ r_xprt->rx_stats.local_inv_needed++;
+ frwr = &mr->frwr;
frwr->fr_cqe.done = frwr_wc_localinv;
last = &frwr->fr_invwr;
- memset(last, 0, sizeof(*last));
+ last->next = NULL;
last->wr_cqe = &frwr->fr_cqe;
+ last->sg_list = NULL;
+ last->num_sge = 0;
last->opcode = IB_WR_LOCAL_INV;
+ last->send_flags = IB_SEND_SIGNALED;
last->ex.invalidate_rkey = mr->mr_handle;
- count++;
*prev = last;
prev = &last->next;
}
- if (!frwr)
- goto unmap;
/* Strong send queue ordering guarantees that when the
* last WR in the chain completes, all WRs in the chain
* are complete.
*/
- last->send_flags = IB_SEND_SIGNALED;
frwr->fr_cqe.done = frwr_wc_localinv_wake;
reinit_completion(&frwr->fr_linv_done);
@@ -578,37 +594,126 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs)
* replaces the QP. The RPC reply handler won't call us
* unless ri_id->qp is a valid pointer.
*/
- r_xprt->rx_stats.local_inv_needed++;
bad_wr = NULL;
- rc = ib_post_send(ia->ri_id->qp, first, &bad_wr);
+ rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
+ trace_xprtrdma_post_send(req, rc);
+
+ /* The final LOCAL_INV WR in the chain is supposed to
+ * do the wake. If it was never posted, the wake will
+ * not happen, so don't wait in that case.
+ */
if (bad_wr != first)
wait_for_completion(&frwr->fr_linv_done);
- if (rc)
- goto out_release;
+ if (!rc)
+ return;
- /* ORDER: Now DMA unmap all of the MRs, and return
- * them to the free MR list.
+ /* Recycle MRs in the LOCAL_INV chain that did not get posted.
*/
-unmap:
- while (!list_empty(mrs)) {
- mr = rpcrdma_mr_pop(mrs);
- rpcrdma_mr_unmap_and_put(mr);
+ while (bad_wr) {
+ frwr = container_of(bad_wr, struct rpcrdma_frwr,
+ fr_invwr);
+ mr = container_of(frwr, struct rpcrdma_mr, frwr);
+ bad_wr = bad_wr->next;
+
+ list_del_init(&mr->mr_list);
+ rpcrdma_mr_recycle(mr);
}
- return;
+}
-out_release:
- pr_err("rpcrdma: FRWR invalidate ib_post_send returned %i\n", rc);
+/**
+ * frwr_wc_localinv_done - Invoked by RDMA provider for a signaled LOCAL_INV WC
+ * @cq: completion queue (ignored)
+ * @wc: completed WR
+ *
+ */
+static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct rpcrdma_frwr *frwr =
+ container_of(cqe, struct rpcrdma_frwr, fr_cqe);
+ struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
- /* Unmap and release the MRs in the LOCAL_INV WRs that did not
- * get posted.
+ /* WARNING: Only wr_cqe and status are reliable at this point */
+ trace_xprtrdma_wc_li_done(wc, frwr);
+ rpcrdma_complete_rqst(frwr->fr_req->rl_reply);
+ __frwr_release_mr(wc, mr);
+}
+
+/**
+ * frwr_unmap_async - invalidate memory regions that were registered for @req
+ * @r_xprt: controlling transport instance
+ * @req: rpcrdma_req with a non-empty list of MRs to process
+ *
+ * This guarantees that registered MRs are properly fenced from the
+ * server before the RPC consumer accesses the data in them. It also
+ * ensures proper Send flow control: waking the next RPC waits until
+ * this RPC has relinquished all its Send Queue entries.
+ */
+void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
+{
+ struct ib_send_wr *first, *last, **prev;
+ const struct ib_send_wr *bad_wr;
+ struct rpcrdma_frwr *frwr;
+ struct rpcrdma_mr *mr;
+ int rc;
+
+ /* Chain the LOCAL_INV Work Requests and post them with
+ * a single ib_post_send() call.
+ */
+ frwr = NULL;
+ prev = &first;
+ while (!list_empty(&req->rl_registered)) {
+ mr = rpcrdma_mr_pop(&req->rl_registered);
+
+ trace_xprtrdma_mr_localinv(mr);
+ r_xprt->rx_stats.local_inv_needed++;
+
+ frwr = &mr->frwr;
+ frwr->fr_cqe.done = frwr_wc_localinv;
+ frwr->fr_req = req;
+ last = &frwr->fr_invwr;
+ last->next = NULL;
+ last->wr_cqe = &frwr->fr_cqe;
+ last->sg_list = NULL;
+ last->num_sge = 0;
+ last->opcode = IB_WR_LOCAL_INV;
+ last->send_flags = IB_SEND_SIGNALED;
+ last->ex.invalidate_rkey = mr->mr_handle;
+
+ *prev = last;
+ prev = &last->next;
+ }
+
+ /* Strong send queue ordering guarantees that when the
+ * last WR in the chain completes, all WRs in the chain
+ * are complete. The last completion will wake up the
+ * RPC waiter.
+ */
+ frwr->fr_cqe.done = frwr_wc_localinv_done;
+
+ /* Transport disconnect drains the receive CQ before it
+ * replaces the QP. The RPC reply handler won't call us
+ * unless ri_id->qp is a valid pointer.
+ */
+ bad_wr = NULL;
+ rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
+ trace_xprtrdma_post_send(req, rc);
+ if (!rc)
+ return;
+
+ /* Recycle MRs in the LOCAL_INV chain that did not get posted.
*/
while (bad_wr) {
- frwr = container_of(bad_wr, struct rpcrdma_frwr,
- fr_invwr);
+ frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr);
mr = container_of(frwr, struct rpcrdma_mr, frwr);
bad_wr = bad_wr->next;
- list_del_init(&mr->mr_list);
rpcrdma_mr_recycle(mr);
}
+
+ /* The final LOCAL_INV WR in the chain is supposed to
+ * do the wake. If it was never posted, the wake will
+ * not happen, so wake here in that case.
+ */
+ rpcrdma_complete_rqst(req->rl_reply);
}
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 85115a2e2639..4345e6912392 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -366,6 +366,9 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
unsigned int pos;
int nsegs;
+ if (rtype == rpcrdma_noch)
+ goto done;
+
pos = rqst->rq_snd_buf.head[0].iov_len;
if (rtype == rpcrdma_areadch)
pos = 0;
@@ -389,7 +392,8 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
nsegs -= mr->mr_nents;
} while (nsegs);
- return 0;
+done:
+ return encode_item_not_present(xdr);
}
/* Register and XDR encode the Write list. Supports encoding a list
@@ -417,6 +421,9 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
int nsegs, nchunks;
__be32 *segcount;
+ if (wtype != rpcrdma_writech)
+ goto done;
+
seg = req->rl_segments;
nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
rqst->rq_rcv_buf.head[0].iov_len,
@@ -451,7 +458,8 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
/* Update count of segments in this Write chunk */
*segcount = cpu_to_be32(nchunks);
- return 0;
+done:
+ return encode_item_not_present(xdr);
}
/* Register and XDR encode the Reply chunk. Supports encoding an array
@@ -476,6 +484,9 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
int nsegs, nchunks;
__be32 *segcount;
+ if (wtype != rpcrdma_replych)
+ return encode_item_not_present(xdr);
+
seg = req->rl_segments;
nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
if (nsegs < 0)
@@ -511,6 +522,16 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
return 0;
}
+static void rpcrdma_sendctx_done(struct kref *kref)
+{
+ struct rpcrdma_req *req =
+ container_of(kref, struct rpcrdma_req, rl_kref);
+ struct rpcrdma_rep *rep = req->rl_reply;
+
+ rpcrdma_complete_rqst(rep);
+ rep->rr_rxprt->rx_stats.reply_waits_for_send++;
+}
+
/**
* rpcrdma_sendctx_unmap - DMA-unmap Send buffer
* @sc: sendctx containing SGEs to unmap
@@ -520,6 +541,9 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
{
struct ib_sge *sge;
+ if (!sc->sc_unmap_count)
+ return;
+
/* The first two SGEs contain the transport header and
* the inline buffer. These are always left mapped so
* they can be cheaply re-used.
@@ -529,9 +553,7 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length,
DMA_TO_DEVICE);
- if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES,
- &sc->sc_req->rl_flags))
- wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
+ kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done);
}
/* Prepare an SGE for the RPC-over-RDMA transport header.
@@ -666,7 +688,7 @@ map_tail:
out:
sc->sc_wr.num_sge += sge_no;
if (sc->sc_unmap_count)
- __set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
+ kref_get(&req->rl_kref);
return true;
out_regbuf:
@@ -699,22 +721,28 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req, u32 hdrlen,
struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
{
+ int ret;
+
+ ret = -EAGAIN;
req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt);
if (!req->rl_sendctx)
- return -EAGAIN;
+ goto err;
req->rl_sendctx->sc_wr.num_sge = 0;
req->rl_sendctx->sc_unmap_count = 0;
req->rl_sendctx->sc_req = req;
- __clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
+ kref_init(&req->rl_kref);
+ ret = -EIO;
if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen))
- return -EIO;
-
+ goto err;
if (rtype != rpcrdma_areadch)
if (!rpcrdma_prepare_msg_sges(r_xprt, req, xdr, rtype))
- return -EIO;
-
+ goto err;
return 0;
+
+err:
+ trace_xprtrdma_prepsend_failed(&req->rl_slot, ret);
+ return ret;
}
/**
@@ -842,50 +870,28 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
* send a Call message with a Position Zero Read chunk and a
* regular Read chunk at the same time.
*/
- if (rtype != rpcrdma_noch) {
- ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
- if (ret)
- goto out_err;
- }
- ret = encode_item_not_present(xdr);
+ ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
if (ret)
goto out_err;
-
- if (wtype == rpcrdma_writech) {
- ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
- if (ret)
- goto out_err;
- }
- ret = encode_item_not_present(xdr);
+ ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
if (ret)
goto out_err;
-
- if (wtype != rpcrdma_replych)
- ret = encode_item_not_present(xdr);
- else
- ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
+ ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
if (ret)
goto out_err;
- trace_xprtrdma_marshal(rqst, xdr_stream_pos(xdr), rtype, wtype);
-
- ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr),
+ ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len,
&rqst->rq_snd_buf, rtype);
if (ret)
goto out_err;
+
+ trace_xprtrdma_marshal(req, rtype, wtype);
return 0;
out_err:
trace_xprtrdma_marshal_failed(rqst, ret);
- switch (ret) {
- case -EAGAIN:
- xprt_wait_for_buffer_space(rqst->rq_xprt);
- break;
- case -ENOBUFS:
- break;
- default:
- r_xprt->rx_stats.failed_marshal_count++;
- }
+ r_xprt->rx_stats.failed_marshal_count++;
+ frwr_reset(req);
return ret;
}
@@ -1269,51 +1275,17 @@ out_badheader:
goto out;
}
-void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
-{
- /* Invalidate and unmap the data payloads before waking
- * the waiting application. This guarantees the memory
- * regions are properly fenced from the server before the
- * application accesses the data. It also ensures proper
- * send flow control: waking the next RPC waits until this
- * RPC has relinquished all its Send Queue entries.
- */
- if (!list_empty(&req->rl_registered))
- frwr_unmap_sync(r_xprt, &req->rl_registered);
-
- /* Ensure that any DMA mapped pages associated with
- * the Send of the RPC Call have been unmapped before
- * allowing the RPC to complete. This protects argument
- * memory not controlled by the RPC client from being
- * re-used before we're done with it.
- */
- if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
- r_xprt->rx_stats.reply_waits_for_send++;
- out_of_line_wait_on_bit(&req->rl_flags,
- RPCRDMA_REQ_F_TX_RESOURCES,
- bit_wait,
- TASK_UNINTERRUPTIBLE);
- }
-}
-
-/* Reply handling runs in the poll worker thread. Anything that
- * might wait is deferred to a separate workqueue.
- */
-void rpcrdma_deferred_completion(struct work_struct *work)
+static void rpcrdma_reply_done(struct kref *kref)
{
- struct rpcrdma_rep *rep =
- container_of(work, struct rpcrdma_rep, rr_work);
- struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
- struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
+ struct rpcrdma_req *req =
+ container_of(kref, struct rpcrdma_req, rl_kref);
- trace_xprtrdma_defer_cmp(rep);
- if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
- frwr_reminv(rep, &req->rl_registered);
- rpcrdma_release_rqst(r_xprt, req);
- rpcrdma_complete_rqst(rep);
+ rpcrdma_complete_rqst(req->rl_reply);
}
-/* Process received RPC/RDMA messages.
+/**
+ * rpcrdma_reply_handler - Process received RPC/RDMA messages
+ * @rep: Incoming rpcrdma_rep object to process
*
* Errors must result in the RPC task either being awakened, or
* allowed to timeout, to discover the errors at that time.
@@ -1360,10 +1332,10 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
else if (credits > buf->rb_max_requests)
credits = buf->rb_max_requests;
if (buf->rb_credits != credits) {
- spin_lock_bh(&xprt->transport_lock);
+ spin_lock(&xprt->transport_lock);
buf->rb_credits = credits;
xprt->cwnd = credits << RPC_CWNDSHIFT;
- spin_unlock_bh(&xprt->transport_lock);
+ spin_unlock(&xprt->transport_lock);
}
req = rpcr_to_rdmar(rqst);
@@ -1373,10 +1345,16 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
}
req->rl_reply = rep;
rep->rr_rqst = rqst;
- clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
- queue_work(buf->rb_completion_wq, &rep->rr_work);
+
+ if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
+ frwr_reminv(rep, &req->rl_registered);
+ if (!list_empty(&req->rl_registered))
+ frwr_unmap_async(r_xprt, req);
+ /* LocalInv completion will complete the RPC */
+ else
+ kref_put(&req->rl_kref, rpcrdma_reply_done);
return;
out_badversion:
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index bed57d8b5c19..d1fcc41d5eb5 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -72,9 +72,9 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
credits = r_xprt->rx_buf.rb_bc_max_requests;
- spin_lock_bh(&xprt->transport_lock);
+ spin_lock(&xprt->transport_lock);
xprt->cwnd = credits << RPC_CWNDSHIFT;
- spin_unlock_bh(&xprt->transport_lock);
+ spin_unlock(&xprt->transport_lock);
spin_lock(&xprt->queue_lock);
ret = 0;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 0004535c0188..3fe665152d95 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -226,9 +226,9 @@ static void handle_connect_req(struct rdma_cm_id *new_cma_id,
* Enqueue the new transport on the accept queue of the listening
* transport
*/
- spin_lock_bh(&listen_xprt->sc_lock);
+ spin_lock(&listen_xprt->sc_lock);
list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);
- spin_unlock_bh(&listen_xprt->sc_lock);
+ spin_unlock(&listen_xprt->sc_lock);
set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags);
svc_xprt_enqueue(&listen_xprt->sc_xprt);
@@ -401,7 +401,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
clear_bit(XPT_CONN, &xprt->xpt_flags);
/* Get the next entry off the accept list */
- spin_lock_bh(&listen_rdma->sc_lock);
+ spin_lock(&listen_rdma->sc_lock);
if (!list_empty(&listen_rdma->sc_accept_q)) {
newxprt = list_entry(listen_rdma->sc_accept_q.next,
struct svcxprt_rdma, sc_accept_q);
@@ -409,7 +409,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
}
if (!list_empty(&listen_rdma->sc_accept_q))
set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags);
- spin_unlock_bh(&listen_rdma->sc_lock);
+ spin_unlock(&listen_rdma->sc_lock);
if (!newxprt)
return NULL;
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 1f73a6a7e43c..52abddac19e5 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -298,6 +298,7 @@ xprt_rdma_destroy(struct rpc_xprt *xprt)
module_put(THIS_MODULE);
}
+/* 60 second timeout, no retries */
static const struct rpc_timeout xprt_rdma_default_timeout = {
.to_initval = 60 * HZ,
.to_maxval = 60 * HZ,
@@ -323,8 +324,9 @@ xprt_setup_rdma(struct xprt_create *args)
if (!xprt)
return ERR_PTR(-ENOMEM);
- /* 60 second timeout, no retries */
xprt->timeout = &xprt_rdma_default_timeout;
+ xprt->connect_timeout = xprt->timeout->to_initval;
+ xprt->max_reconnect_timeout = xprt->timeout->to_maxval;
xprt->bind_timeout = RPCRDMA_BIND_TO;
xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
@@ -487,31 +489,64 @@ xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task)
}
/**
- * xprt_rdma_connect - try to establish a transport connection
+ * xprt_rdma_set_connect_timeout - set timeouts for establishing a connection
+ * @xprt: controlling transport instance
+ * @connect_timeout: reconnect timeout after client disconnects
+ * @reconnect_timeout: reconnect timeout after server disconnects
+ *
+ */
+static void xprt_rdma_tcp_set_connect_timeout(struct rpc_xprt *xprt,
+ unsigned long connect_timeout,
+ unsigned long reconnect_timeout)
+{
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+
+ trace_xprtrdma_op_set_cto(r_xprt, connect_timeout, reconnect_timeout);
+
+ spin_lock(&xprt->transport_lock);
+
+ if (connect_timeout < xprt->connect_timeout) {
+ struct rpc_timeout to;
+ unsigned long initval;
+
+ to = *xprt->timeout;
+ initval = connect_timeout;
+ if (initval < RPCRDMA_INIT_REEST_TO << 1)
+ initval = RPCRDMA_INIT_REEST_TO << 1;
+ to.to_initval = initval;
+ to.to_maxval = initval;
+ r_xprt->rx_timeout = to;
+ xprt->timeout = &r_xprt->rx_timeout;
+ xprt->connect_timeout = connect_timeout;
+ }
+
+ if (reconnect_timeout < xprt->max_reconnect_timeout)
+ xprt->max_reconnect_timeout = reconnect_timeout;
+
+ spin_unlock(&xprt->transport_lock);
+}
+
+/**
+ * xprt_rdma_connect - schedule an attempt to reconnect
* @xprt: transport state
- * @task: RPC scheduler context
+ * @task: RPC scheduler context (unused)
*
*/
static void
xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ unsigned long delay;
trace_xprtrdma_op_connect(r_xprt);
+
+ delay = 0;
if (r_xprt->rx_ep.rep_connected != 0) {
- /* Reconnect */
- schedule_delayed_work(&r_xprt->rx_connect_worker,
- xprt->reestablish_timeout);
- xprt->reestablish_timeout <<= 1;
- if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO)
- xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO;
- else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
- xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
- } else {
- schedule_delayed_work(&r_xprt->rx_connect_worker, 0);
- if (!RPC_IS_ASYNC(task))
- flush_delayed_work(&r_xprt->rx_connect_worker);
+ delay = xprt_reconnect_delay(xprt);
+ xprt_reconnect_backoff(xprt, RPCRDMA_INIT_REEST_TO);
}
+ queue_delayed_work(xprtiod_workqueue, &r_xprt->rx_connect_worker,
+ delay);
}
/**
@@ -550,8 +585,11 @@ out_sleep:
static void
xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst)
{
+ struct rpcrdma_xprt *r_xprt =
+ container_of(xprt, struct rpcrdma_xprt, rx_xprt);
+
memset(rqst, 0, sizeof(*rqst));
- rpcrdma_buffer_put(rpcr_to_rdmar(rqst));
+ rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst));
rpc_wake_up_next(&xprt->backlog);
}
@@ -618,9 +656,16 @@ xprt_rdma_free(struct rpc_task *task)
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
- if (test_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags))
- rpcrdma_release_rqst(r_xprt, req);
trace_xprtrdma_op_free(task, req);
+
+ if (!list_empty(&req->rl_registered))
+ frwr_unmap_sync(r_xprt, req);
+
+ /* XXX: If the RPC is completing because of a signal and
+ * not because a reply was received, we ought to ensure
+ * that the Send completion has fired, so that memory
+ * involved with the Send is not still visible to the NIC.
+ */
}
/**
@@ -667,7 +712,6 @@ xprt_rdma_send_request(struct rpc_rqst *rqst)
goto drop_connection;
rqst->rq_xti