diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2019-09-26 12:20:14 -0700 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2019-09-26 12:20:14 -0700 |
commit | 972a2bf7dfe39ebf49dd47f68d27c416392e53b1 (patch) | |
tree | 1fc6277f7b24c854b3c48a9e082b6625c18145a0 /net/sunrpc | |
parent | 7be3cb019db1cbd5fd5ffe6d64a23fefa4b6f229 (diff) | |
parent | a8fd0feeca35cb8f9ddd950191f4aeb777f52f89 (diff) |
Merge tag 'nfs-for-5.4-1' of git://git.linux-nfs.org/projects/anna/linux-nfs
Pull NFS client updates from Anna Schumaker:
"Stable bugfixes:
- Dequeue the request from the receive queue while we're re-encoding
# v4.20+
- Fix buffer handling of GSS MIC without slack # 5.1
Features:
- Increase xprtrdma maximum transport header and slot table sizes
- Add support for nfs4_call_sync() calls using a custom
rpc_task_struct
- Optimize the default readahead size
- Enable pNFS filelayout LAYOUTGET on OPEN
Other bugfixes and cleanups:
- Fix possible null-pointer dereferences and memory leaks
- Various NFS over RDMA cleanups
- Various NFS over RDMA comment updates
- Don't receive TCP data into a reset request buffer
- Don't try to parse incomplete RPC messages
- Fix congestion window race with disconnect
- Clean up pNFS return-on-close error handling
- Fixes for NFS4ERR_OLD_STATEID handling"
* tag 'nfs-for-5.4-1' of git://git.linux-nfs.org/projects/anna/linux-nfs: (53 commits)
pNFS/filelayout: enable LAYOUTGET on OPEN
NFS: Optimise the default readahead size
NFSv4: Handle NFS4ERR_OLD_STATEID in LOCKU
NFSv4: Handle NFS4ERR_OLD_STATEID in CLOSE/OPEN_DOWNGRADE
NFSv4: Fix OPEN_DOWNGRADE error handling
pNFS: Handle NFS4ERR_OLD_STATEID on layoutreturn by bumping the state seqid
NFSv4: Add a helper to increment stateid seqids
NFSv4: Handle RPC level errors in LAYOUTRETURN
NFSv4: Handle NFS4ERR_DELAY correctly in return-on-close
NFSv4: Clean up pNFS return-on-close error handling
pNFS: Ensure we do clear the return-on-close layout stateid on fatal errors
NFS: remove unused check for negative dentry
NFSv3: use nfs_add_or_obtain() to create and reference inodes
NFS: Refactor nfs_instantiate() for dentry referencing callers
SUNRPC: Fix congestion window race with disconnect
SUNRPC: Don't try to parse incomplete RPC messages
SUNRPC: Rename xdr_buf_read_netobj to xdr_buf_read_mic
SUNRPC: Fix buffer handling of GSS MIC without slack
SUNRPC: RPC level errors should always set task->tk_rpc_status
SUNRPC: Don't receive TCP data into a request buffer that has been reset
...
Diffstat (limited to 'net/sunrpc')
-rw-r--r-- | net/sunrpc/auth_gss/auth_gss.c | 2 | ||||
-rw-r--r-- | net/sunrpc/clnt.c | 26 | ||||
-rw-r--r-- | net/sunrpc/sched.c | 32 | ||||
-rw-r--r-- | net/sunrpc/xdr.c | 65 | ||||
-rw-r--r-- | net/sunrpc/xprt.c | 61 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/backchannel.c | 4 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/frwr_ops.c | 166 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 71 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/transport.c | 15 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 263 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/xprt_rdma.h | 59 | ||||
-rw-r--r-- | net/sunrpc/xprtsock.c | 8 |
12 files changed, 388 insertions, 384 deletions
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c index 4ce42c62458e..d75fddca44c9 100644 --- a/net/sunrpc/auth_gss/auth_gss.c +++ b/net/sunrpc/auth_gss/auth_gss.c @@ -1960,7 +1960,7 @@ gss_unwrap_resp_integ(struct rpc_task *task, struct rpc_cred *cred, if (xdr_buf_subsegment(rcv_buf, &integ_buf, data_offset, integ_len)) goto unwrap_failed; - if (xdr_buf_read_netobj(rcv_buf, &mic, mic_offset)) + if (xdr_buf_read_mic(rcv_buf, &mic, mic_offset)) goto unwrap_failed; maj_stat = gss_verify_mic(ctx->gc_gss_ctx, &integ_buf, &mic); if (maj_stat == GSS_S_CONTEXT_EXPIRED) diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index a07b516e503a..f7f78566be46 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -1837,7 +1837,7 @@ call_allocate(struct rpc_task *task) return; } - rpc_exit(task, -ERESTARTSYS); + rpc_call_rpcerror(task, -ERESTARTSYS); } static int @@ -1862,6 +1862,7 @@ rpc_xdr_encode(struct rpc_task *task) req->rq_rbuffer, req->rq_rcvsize); + req->rq_reply_bytes_recvd = 0; req->rq_snd_buf.head[0].iov_len = 0; xdr_init_encode(&xdr, &req->rq_snd_buf, req->rq_snd_buf.head[0].iov_base, req); @@ -1881,6 +1882,8 @@ call_encode(struct rpc_task *task) if (!rpc_task_need_encode(task)) goto out; dprint_status(task); + /* Dequeue task from the receive queue while we're encoding */ + xprt_request_dequeue_xprt(task); /* Encode here so that rpcsec_gss can use correct sequence number. */ rpc_xdr_encode(task); /* Did the encode result in an error condition? */ @@ -2479,6 +2482,7 @@ call_decode(struct rpc_task *task) struct rpc_clnt *clnt = task->tk_client; struct rpc_rqst *req = task->tk_rqstp; struct xdr_stream xdr; + int err; dprint_status(task); @@ -2501,6 +2505,15 @@ call_decode(struct rpc_task *task) * before it changed req->rq_reply_bytes_recvd. */ smp_rmb(); + + /* + * Did we ever call xprt_complete_rqst()? If not, we should assume + * the message is incomplete. + */ + err = -EAGAIN; + if (!req->rq_reply_bytes_recvd) + goto out; + req->rq_rcv_buf.len = req->rq_private_buf.len; /* Check that the softirq receive buffer is valid */ @@ -2509,7 +2522,9 @@ call_decode(struct rpc_task *task) xdr_init_decode(&xdr, &req->rq_rcv_buf, req->rq_rcv_buf.head[0].iov_base, req); - switch (rpc_decode_header(task, &xdr)) { + err = rpc_decode_header(task, &xdr); +out: + switch (err) { case 0: task->tk_action = rpc_exit_task; task->tk_status = rpcauth_unwrap_resp(task, &xdr); @@ -2518,9 +2533,6 @@ call_decode(struct rpc_task *task) return; case -EAGAIN: task->tk_status = 0; - xdr_free_bvec(&req->rq_rcv_buf); - req->rq_reply_bytes_recvd = 0; - req->rq_rcv_buf.len = 0; if (task->tk_client->cl_discrtry) xprt_conditional_disconnect(req->rq_xprt, req->rq_connect_cookie); @@ -2561,7 +2573,7 @@ rpc_encode_header(struct rpc_task *task, struct xdr_stream *xdr) return 0; out_fail: trace_rpc_bad_callhdr(task); - rpc_exit(task, error); + rpc_call_rpcerror(task, error); return error; } @@ -2628,7 +2640,7 @@ out_garbage: return -EAGAIN; } out_err: - rpc_exit(task, error); + rpc_call_rpcerror(task, error); return error; out_unparsable: diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index 1f275aba786f..360afe153193 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -541,33 +541,14 @@ rpc_wake_up_task_on_wq_queue_action_locked(struct workqueue_struct *wq, return NULL; } -static void -rpc_wake_up_task_on_wq_queue_locked(struct workqueue_struct *wq, - struct rpc_wait_queue *queue, struct rpc_task *task) -{ - rpc_wake_up_task_on_wq_queue_action_locked(wq, queue, task, NULL, NULL); -} - /* * Wake up a queued task while the queue lock is being held */ -static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task) -{ - rpc_wake_up_task_on_wq_queue_locked(rpciod_workqueue, queue, task); -} - -/* - * Wake up a task on a specific queue - */ -void rpc_wake_up_queued_task_on_wq(struct workqueue_struct *wq, - struct rpc_wait_queue *queue, - struct rpc_task *task) +static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, + struct rpc_task *task) { - if (!RPC_IS_QUEUED(task)) - return; - spin_lock(&queue->lock); - rpc_wake_up_task_on_wq_queue_locked(wq, queue, task); - spin_unlock(&queue->lock); + rpc_wake_up_task_on_wq_queue_action_locked(rpciod_workqueue, queue, + task, NULL, NULL); } /* @@ -930,8 +911,10 @@ static void __rpc_execute(struct rpc_task *task) /* * Signalled tasks should exit rather than sleep. */ - if (RPC_SIGNALLED(task)) + if (RPC_SIGNALLED(task)) { + task->tk_rpc_status = -ERESTARTSYS; rpc_exit(task, -ERESTARTSYS); + } /* * The queue->lock protects against races with @@ -967,6 +950,7 @@ static void __rpc_execute(struct rpc_task *task) */ dprintk("RPC: %5u got signal\n", task->tk_pid); set_bit(RPC_TASK_SIGNALLED, &task->tk_runstate); + task->tk_rpc_status = -ERESTARTSYS; rpc_exit(task, -ERESTARTSYS); } dprintk("RPC: %5u sync task resuming\n", task->tk_pid); diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c index 48c93b9e525e..14ba9e72a204 100644 --- a/net/sunrpc/xdr.c +++ b/net/sunrpc/xdr.c @@ -560,7 +560,7 @@ EXPORT_SYMBOL_GPL(xdr_init_encode); * required at the end of encoding, or any other time when the xdr_buf * data might be read. */ -void xdr_commit_encode(struct xdr_stream *xdr) +inline void xdr_commit_encode(struct xdr_stream *xdr) { int shift = xdr->scratch.iov_len; void *page; @@ -1236,43 +1236,60 @@ xdr_encode_word(struct xdr_buf *buf, unsigned int base, u32 obj) } EXPORT_SYMBOL_GPL(xdr_encode_word); -/* If the netobj starting offset bytes from the start of xdr_buf is contained - * entirely in the head or the tail, set object to point to it; otherwise - * try to find space for it at the end of the tail, copy it there, and - * set obj to point to it. */ -int xdr_buf_read_netobj(struct xdr_buf *buf, struct xdr_netobj *obj, unsigned int offset) +/** + * xdr_buf_read_mic() - obtain the address of the GSS mic from xdr buf + * @buf: pointer to buffer containing a mic + * @mic: on success, returns the address of the mic + * @offset: the offset in buf where mic may be found + * + * This function may modify the xdr buf if the mic is found to be straddling + * a boundary between head, pages, and tail. On success the mic can be read + * from the address returned. There is no need to free the mic. + * + * Return: Success returns 0, otherwise an integer error. + */ +int xdr_buf_read_mic(struct xdr_buf *buf, struct xdr_netobj *mic, unsigned int offset) { struct xdr_buf subbuf; + unsigned int boundary; - if (xdr_decode_word(buf, offset, &obj->len)) + if (xdr_decode_word(buf, offset, &mic->len)) return -EFAULT; - if (xdr_buf_subsegment(buf, &subbuf, offset + 4, obj->len)) + offset += 4; + + /* Is the mic partially in the head? */ + boundary = buf->head[0].iov_len; + if (offset < boundary && (offset + mic->len) > boundary) + xdr_shift_buf(buf, boundary - offset); + + /* Is the mic partially in the pages? */ + boundary += buf->page_len; + if (offset < boundary && (offset + mic->len) > boundary) + xdr_shrink_pagelen(buf, boundary - offset); + + if (xdr_buf_subsegment(buf, &subbuf, offset, mic->len)) return -EFAULT; - /* Is the obj contained entirely in the head? */ - obj->data = subbuf.head[0].iov_base; - if (subbuf.head[0].iov_len == obj->len) + /* Is the mic contained entirely in the head? */ + mic->data = subbuf.head[0].iov_base; + if (subbuf.head[0].iov_len == mic->len) return 0; - /* ..or is the obj contained entirely in the tail? */ - obj->data = subbuf.tail[0].iov_base; - if (subbuf.tail[0].iov_len == obj->len) + /* ..or is the mic contained entirely in the tail? */ + mic->data = subbuf.tail[0].iov_base; + if (subbuf.tail[0].iov_len == mic->len) return 0; - /* use end of tail as storage for obj: - * (We don't copy to the beginning because then we'd have - * to worry about doing a potentially overlapping copy. - * This assumes the object is at most half the length of the - * tail.) */ - if (obj->len > buf->buflen - buf->len) + /* Find a contiguous area in @buf to hold all of @mic */ + if (mic->len > buf->buflen - buf->len) return -ENOMEM; if (buf->tail[0].iov_len != 0) - obj->data = buf->tail[0].iov_base + buf->tail[0].iov_len; + mic->data = buf->tail[0].iov_base + buf->tail[0].iov_len; else - obj->data = buf->head[0].iov_base + buf->head[0].iov_len; - __read_bytes_from_xdr_buf(&subbuf, obj->data, obj->len); + mic->data = buf->head[0].iov_base + buf->head[0].iov_len; + __read_bytes_from_xdr_buf(&subbuf, mic->data, mic->len); return 0; } -EXPORT_SYMBOL_GPL(xdr_buf_read_netobj); +EXPORT_SYMBOL_GPL(xdr_buf_read_mic); /* Returns 0 on success, or else a negative error code. */ static int diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 2e71f5455c6c..8a45b3ccc313 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -456,6 +456,12 @@ void xprt_release_rqst_cong(struct rpc_task *task) } EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); +static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt) +{ + if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) + __xprt_lock_write_next_cong(xprt); +} + /* * Clear the congestion window wait flag and wake up the next * entry on xprt->sending @@ -671,6 +677,7 @@ void xprt_disconnect_done(struct rpc_xprt *xprt) spin_lock(&xprt->transport_lock); xprt_clear_connected(xprt); xprt_clear_write_space_locked(xprt); + xprt_clear_congestion_window_wait_locked(xprt); xprt_wake_pending_tasks(xprt, -ENOTCONN); spin_unlock(&xprt->transport_lock); } @@ -1324,6 +1331,36 @@ xprt_request_dequeue_transmit(struct rpc_task *task) } /** + * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue + * @task: pointer to rpc_task + * + * Remove a task from the transmit and receive queues, and ensure that + * it is not pinned by the receive work item. + */ +void +xprt_request_dequeue_xprt(struct rpc_task *task) +{ + struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; + + if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) || + test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || + xprt_is_pinned_rqst(req)) { + spin_lock(&xprt->queue_lock); + xprt_request_dequeue_transmit_locked(task); + xprt_request_dequeue_receive_locked(task); + while (xprt_is_pinned_rqst(req)) { + set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); + spin_unlock(&xprt->queue_lock); + xprt_wait_on_pinned_rqst(req); + spin_lock(&xprt->queue_lock); + clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); + } + spin_unlock(&xprt->queue_lock); + } +} + +/** * xprt_request_prepare - prepare an encoded request for transport * @req: pointer to rpc_rqst * @@ -1747,28 +1784,6 @@ void xprt_retry_reserve(struct rpc_task *task) xprt_do_reserve(xprt, task); } -static void -xprt_request_dequeue_all(struct rpc_task *task, struct rpc_rqst *req) -{ - struct rpc_xprt *xprt = req->rq_xprt; - - if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) || - test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || - xprt_is_pinned_rqst(req)) { - spin_lock(&xprt->queue_lock); - xprt_request_dequeue_transmit_locked(task); - xprt_request_dequeue_receive_locked(task); - while (xprt_is_pinned_rqst(req)) { - set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); - spin_unlock(&xprt->queue_lock); - xprt_wait_on_pinned_rqst(req); - spin_lock(&xprt->queue_lock); - clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); - } - spin_unlock(&xprt->queue_lock); - } -} - /** * xprt_release - release an RPC request slot * @task: task which is finished with the slot @@ -1788,7 +1803,7 @@ void xprt_release(struct rpc_task *task) } xprt = req->rq_xprt; - xprt_request_dequeue_all(task, req); + xprt_request_dequeue_xprt(task); spin_lock(&xprt->transport_lock); xprt->ops->release_xprt(xprt, task); if (xprt->ops->release_request) diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index 59e624b1d7a0..50e075fcdd8f 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -54,9 +54,7 @@ size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt) unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *xprt) { - struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); - - return r_xprt->rx_buf.rb_bc_srv_max_requests; + return RPCRDMA_BACKWARD_WRS >> 1; } static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 0b6dad7580a1..30065a28628c 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -7,67 +7,37 @@ /* Lightweight memory registration using Fast Registration Work * Requests (FRWR). * - * FRWR features ordered asynchronous registration and deregistration - * of arbitrarily sized memory regions. This is the fastest and safest + * FRWR features ordered asynchronous registration and invalidation + * of arbitrarily-sized memory regions. This is the fastest and safest * but most complex memory registration mode. */ /* Normal operation * - * A Memory Region is prepared for RDMA READ or WRITE using a FAST_REG + * A Memory Region is prepared for RDMA Read or Write using a FAST_REG * Work Request (frwr_map). When the RDMA operation is finished, this * Memory Region is invalidated using a LOCAL_INV Work Request - * (frwr_unmap_sync). + * (frwr_unmap_async and frwr_unmap_sync). * - * Typically these Work Requests are not signaled, and neither are RDMA - * SEND Work Requests (with the exception of signaling occasionally to - * prevent provider work queue overflows). This greatly reduces HCA + * Typically FAST_REG Work Requests are not signaled, and neither are + * RDMA Send Work Requests (with the exception of signaling occasionally + * to prevent provider work queue overflows). This greatly reduces HCA * interrupt workload. - * - * As an optimization, frwr_unmap marks MRs INVALID before the - * LOCAL_INV WR is posted. If posting succeeds, the MR is placed on - * rb_mrs immediately so that no work (like managing a linked list - * under a spinlock) is needed in the completion upcall. - * - * But this means that frwr_map() can occasionally encounter an MR - * that is INVALID but the LOCAL_INV WR has not completed. Work Queue - * ordering prevents a subsequent FAST_REG WR from executing against - * that MR while it is still being invalidated. */ /* Transport recovery * - * ->op_map and the transport connect worker cannot run at the same - * time, but ->op_unmap can fire while the transport connect worker - * is running. Thus MR recovery is handled in ->op_map, to guarantee - * that recovered MRs are owned by a sending RPC, and not one where - * ->op_unmap could fire at the same time transport reconnect is - * being done. - * - * When the underlying transport disconnects, MRs are left in one of - * four states: - * - * INVALID: The MR was not in use before the QP entered ERROR state. - * - * VALID: The MR was registered before the QP entered ERROR state. - * - * FLUSHED_FR: The MR was being registered when the QP entered ERROR - * state, and the pending WR was flushed. - * - * FLUSHED_LI: The MR was being invalidated when the QP entered ERROR - * state, and the pending WR was flushed. - * - * When frwr_map encounters FLUSHED and VALID MRs, they are recovered - * with ib_dereg_mr and then are re-initialized. Because MR recovery - * allocates fresh resources, it is deferred to a workqueue, and the - * recovered MRs are placed back on the rb_mrs list when recovery is - * complete. frwr_map allocates another MR for the current RPC while - * the broken MR is reset. - * - * To ensure that frwr_map doesn't encounter an MR that is marked - * INVALID but that is about to be flushed due to a previous transport - * disconnect, the transport connect worker attempts to drain all - * pending send queue WRs before the transport is reconnected. + * frwr_map and frwr_unmap_* cannot run at the same time the transport + * connect worker is running. The connect worker holds the transport + * send lock, just as ->send_request does. This prevents frwr_map and + * the connect worker from running concurrently. When a connection is + * closed, the Receive completion queue is drained before the allowing + * the connect worker to get control. This prevents frwr_unmap and the + * connect worker from running concurrently. + * + * When the underlying transport disconnects, MRs that are in flight + * are flushed and are likely unusable. Thus all flushed MRs are + * destroyed. New MRs are created on demand. */ #include <linux/sunrpc/rpc_rdma.h> @@ -118,15 +88,8 @@ void frwr_release_mr(struct rpcrdma_mr *mr) kfree(mr); } -/* MRs are dynamically allocated, so simply clean up and release the MR. - * A replacement MR will subsequently be allocated on demand. - */ -static void -frwr_mr_recycle_worker(struct work_struct *work) +static void frwr_mr_recycle(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr) { - struct rpcrdma_mr *mr = container_of(work, struct rpcrdma_mr, mr_recycle); - struct rpcrdma_xprt *r_xprt = mr->mr_xprt; - trace_xprtrdma_mr_recycle(mr); if (mr->mr_dir != DMA_NONE) { @@ -136,14 +99,40 @@ frwr_mr_recycle_worker(struct work_struct *work) mr->mr_dir = DMA_NONE; } - spin_lock(&r_xprt->rx_buf.rb_mrlock); + spin_lock(&r_xprt->rx_buf.rb_lock); list_del(&mr->mr_all); r_xprt->rx_stats.mrs_recycled++; - spin_unlock(&r_xprt->rx_buf.rb_mrlock); + spin_unlock(&r_xprt->rx_buf.rb_lock); frwr_release_mr(mr); } +/* MRs are dynamically allocated, so simply clean up and release the MR. + * A replacement MR will subsequently be allocated on demand. + */ +static void +frwr_mr_recycle_worker(struct work_struct *work) +{ + struct rpcrdma_mr *mr = container_of(work, struct rpcrdma_mr, + mr_recycle); + + frwr_mr_recycle(mr->mr_xprt, mr); +} + +/* frwr_recycle - Discard MRs + * @req: request to reset + * + * Used after a reconnect. These MRs could be in flight, we can't + * tell. Safe thing to do is release them. + */ +void frwr_recycle(struct rpcrdma_req *req) +{ + struct rpcrdma_mr *mr; + + while ((mr = rpcrdma_mr_pop(&req->rl_registered))) + frwr_mr_recycle(mr->mr_xprt, mr); +} + /* frwr_reset - Place MRs back on the free list * @req: request to reset * @@ -156,12 +145,10 @@ frwr_mr_recycle_worker(struct work_struct *work) */ void frwr_reset(struct rpcrdma_req *req) { - while (!list_empty(&req->rl_registered)) { - struct rpcrdma_mr *mr; + struct rpcrdma_mr *mr; - mr = rpcrdma_mr_pop(&req->rl_registered); - rpcrdma_mr_unmap_and_put(mr); - } + while ((mr = rpcrdma_mr_pop(&req->rl_registered))) + rpcrdma_mr_put(mr); } /** @@ -179,11 +166,14 @@ int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr) struct ib_mr *frmr; int rc; + /* NB: ib_alloc_mr and device drivers typically allocate + * memory with GFP_KERNEL. + */ frmr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth); if (IS_ERR(frmr)) goto out_mr_err; - sg = kcalloc(depth, sizeof(*sg), GFP_KERNEL); + sg = kcalloc(depth, sizeof(*sg), GFP_NOFS); if (!sg) goto out_list_err; @@ -203,8 +193,6 @@ out_mr_err: return rc; out_list_err: - dprintk("RPC: %s: sg allocation failure\n", - __func__); ib_dereg_mr(frmr); return -ENOMEM; } @@ -290,8 +278,8 @@ int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep) ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; ep->rep_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */ - ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS / - ia->ri_max_frwr_depth); + ia->ri_max_segs = + DIV_ROUND_UP(RPCRDMA_MAX_DATA_SEGS, ia->ri_max_frwr_depth); /* Reply chunks require segments for head and tail buffers */ ia->ri_max_segs += 2; if (ia->ri_max_segs > RPCRDMA_MAX_HDR_SEGS) @@ -323,31 +311,25 @@ size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt) * @nsegs: number of segments remaining * @writing: true when RDMA Write will be used * @xid: XID of RPC using the registered memory - * @out: initialized MR + * @mr: MR to fill in * * Prepare a REG_MR Work Request to register a memory region * for remote access via RDMA READ or RDMA WRITE. * * Returns the next segment or a negative errno pointer. - * On success, the prepared MR is planted in @out. + * On success, @mr is filled in. */ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, int nsegs, bool writing, __be32 xid, - struct rpcrdma_mr **out) + struct rpcrdma_mr *mr) { struct rpcrdma_ia *ia = &r_xprt->rx_ia; - bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS; - struct rpcrdma_mr *mr; - struct ib_mr *ibmr; struct ib_reg_wr *reg_wr; + struct ib_mr *ibmr; int i, n; u8 key; - mr = rpcrdma_mr_get(r_xprt); - if (!mr) - goto out_getmr_err; - if (nsegs > ia->ri_max_frwr_depth) nsegs = ia->ri_max_frwr_depth; for (i = 0; i < nsegs;) { @@ -362,7 +344,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, ++seg; ++i; - if (holes_ok) + if (ia->ri_mrtype == IB_MR_TYPE_SG_GAPS) continue; if ((i < nsegs && offset_in_page(seg->mr_offset)) || offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len)) @@ -397,22 +379,15 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, mr->mr_offset = ibmr->iova; trace_xprtrdma_mr_map(mr); - *out = mr; return seg; -out_getmr_err: - xprt_wait_for_buffer_space(&r_xprt->rx_xprt); - return ERR_PTR(-EAGAIN); - out_dmamap_err: mr->mr_dir = DMA_NONE; trace_xprtrdma_frwr_sgerr(mr, i); - rpcrdma_mr_put(mr); return ERR_PTR(-EIO); out_mapmr_err: trace_xprtrdma_frwr_maperr(mr, n); - rpcrdma_mr_recycle(mr); return ERR_PTR(-EIO); } @@ -485,7 +460,7 @@ void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs) if (mr->mr_handle == rep->rr_inv_rkey) { list_del_init(&mr->mr_list); trace_xprtrdma_mr_remoteinv(mr); - rpcrdma_mr_unmap_and_put(mr); + rpcrdma_mr_put(mr); break; /* only one invalidated MR per RPC */ } } @@ -495,7 +470,7 @@ static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr) if (wc->status != IB_WC_SUCCESS) rpcrdma_mr_recycle(mr); else - rpcrdma_mr_unmap_and_put(mr); + rpcrdma_mr_put(mr); } /** @@ -532,8 +507,8 @@ static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) /* WARNING: Only wr_cqe and status are reliable at this point */ trace_xprtrdma_wc_li_wake(wc, frwr); - complete(&frwr->fr_linv_done); __frwr_release_mr(wc, mr); + complete(&frwr->fr_linv_done); } /** @@ -562,8 +537,7 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) */ frwr = NULL; prev = &first; - while (!list_empty(&req->rl_registered)) { - mr = rpcrdma_mr_pop(&req->rl_registered); + while ((mr = rpcrdma_mr_pop(&req->rl_registered))) { trace_xprtrdma_mr_localinv(mr); r_xprt->rx_stats.local_inv_needed++; @@ -632,11 +606,15 @@ static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc) struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr, fr_cqe); struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr); + struct rpcrdma_rep *rep = mr->mr_req->rl_reply; /* WARNING: Only wr_cqe and status are reliable at this point */ trace_xprtrdma_wc_li_done(wc, frwr); - rpcrdma_complete_rqst(frwr->fr_req->rl_reply); __frwr_release_mr(wc, mr); + + /* Ensure @rep is generated before __frwr_release_mr */ + smp_rmb(); + rpcrdma_complete_rqst(rep); } /** @@ -662,15 +640,13 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) */ frwr = NULL; prev = &first; - while (!list_empty(&req->rl_registered)) { - mr = rpcrdma_mr_pop(&req->rl_registered); + while ((mr = rpcrdma_mr_pop(&req->rl_registered))) { trace_xprtrdma_mr_localinv(mr); r_xprt->rx_stats.local_inv_needed++; frwr = &mr->frwr; frwr->fr_cqe.done = frwr_wc_localinv; - frwr->fr_req = req; last = &frwr->fr_invwr; last->next = NULL; last->wr_cqe = &frwr->fr_cqe; diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 4345e6912392..b86b5fd62d9f 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -342,6 +342,32 @@ encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr, return 0; } +static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct rpcrdma_mr_seg *seg, + int nsegs, bool writing, + struct rpcrdma_mr **mr) +{ + *mr = rpcrdma_mr_pop(&req->rl_free_mrs); + if (!*mr) { + *mr = rpcrdma_mr_get(r_xprt); + if (!*mr) + goto out_getmr_err; + trace_xprtrdma_mr_get(req); + (*mr)->mr_req = req; + } + + rpcrdma_mr_push(*mr, &req->rl_registered); + return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr); + +out_getmr_err: + trace_xprtrdma_nomrs(req); + xprt_wait_for_buffer_space(&r_xprt->rx_xprt); + if (r_xprt->rx_ep.rep_connected != -ENODEV) + schedule_work(&r_xprt->rx_buf.rb_refresh_worker); + return ERR_PTR(-EAGAIN); +} + /* Register and XDR encode the Read list. Supports encoding a list of read * segments that belong to a single read chunk. * @@ -356,9 +382,10 @@ encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr, * * Only a single @pos value is currently supported. */ -static noinline int -rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, - struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype) +static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct rpc_rqst *rqst, + enum rpcrdma_chunktype rtype) { struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; @@ -379,10 +406,9 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, return nsegs; do { - seg = frwr_map(r_xprt, seg, nsegs, false, rqst->rq_xid, &mr); + seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, false, &mr); if (IS_ERR(seg)) return PTR_ERR(seg); - rpcrdma_mr_push(mr, &req->rl_registered); if (encode_read_segment(xdr, mr, pos) < 0) return -EMSGSIZE; @@ -411,9 +437,10 @@ done: * * Only a single Write chunk is currently supported. */ -static noinline int -rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, - struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype) +static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct rpc_rqst *rqst, + enum rpcrdma_chunktype wtype) { struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; @@ -440,10 +467,9 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, nchunks = 0; do { - seg = frwr_map(r_xprt, seg, nsegs, true, rqst->rq_xid, &mr); + seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr); if (IS_ERR(seg)) return PTR_ERR(seg); - rpcrdma_mr_push(mr, &req->rl_registered); if (encode_rdma_segment(xdr, mr) < 0) return -EMSGSIZE; @@ -474,9 +500,10 @@ done: * Returns zero on success, or a negative errno if a failure occurred. * @xdr is advanced to the next position in the stream. */ -static noinline int -rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, - struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype) +static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, + struct rpc_rqst *rqst, + enum rpcrdma_chunktype wtype) { struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; @@ -501,10 +528,9 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, nchunks = 0; do { - seg = frwr_map(r_xprt, seg, nsegs, true, rqst->rq_xid, &mr); + seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr); if (IS_ERR(seg)) return PTR_ERR(seg); - rpcrdma_mr_push(mr, &req->rl_registered); if (encode_rdma_segment(xdr, mr) < 0) return -EMSGSIZE; @@ -841,12 +867,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) * chunks. Very likely the connection has been replaced, * so these registrations are invalid and unusable. */ - while (unlikely(!list_empty(&req->rl_registered))) { - struct rpcrdma_mr *mr; - - mr = rpcrdma_mr_pop(&req->rl_registered); - rpcrdma_mr_recycle(mr); - } + frwr_recycle(req); /* This implementation supports the following combinations * of chunk lists in one RPC-over-RDMA Call message: @@ -1240,8 +1261,6 @@ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep) struct rpc_rqst *rqst = rep->rr_rqst; int status; - xprt->reestablish_timeout = 0; - switch (rep->rr_proc) { case rdma_msg: status = rpcrdma_decode_msg(r_xprt, rep, rqst); @@ -1300,6 +1319,12 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) u32 credits; __be32 *p; + /* Any data means we had a useful conversation, so + * then we don't need to delay the next reconnect. + */ + if (xprt->reestablish_timeout) + xprt->reestablish_timeout = 0; + /* Fixed transport header fields */ xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf, rep->rr_hdrbuf.head[0].iov_base, NULL); diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 2ec349ed4770..160558b4135e 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/ |