From 9dc6edcf676fe188430e8b119f91280bbf285163 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Wed, 22 Aug 2018 14:24:16 -0400 Subject: SUNRPC: Clean up initialisation of the struct rpc_rqst Move the initialisation back into xprt.c. Signed-off-by: Trond Myklebust --- net/sunrpc/clnt.c | 1 - net/sunrpc/xprt.c | 91 +++++++++++++++++++++++++++++++------------------------ 2 files changed, 51 insertions(+), 41 deletions(-) (limited to 'net') diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 8ea2f5fadd96..bc9d020bf71f 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -1558,7 +1558,6 @@ call_reserveresult(struct rpc_task *task) task->tk_status = 0; if (status >= 0) { if (task->tk_rqstp) { - xprt_request_init(task); task->tk_action = call_refresh; return; } diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index a8db2e3f8904..6aa09edc9567 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -1250,6 +1250,55 @@ void xprt_free(struct rpc_xprt *xprt) } EXPORT_SYMBOL_GPL(xprt_free); +static __be32 +xprt_alloc_xid(struct rpc_xprt *xprt) +{ + __be32 xid; + + spin_lock(&xprt->reserve_lock); + xid = (__force __be32)xprt->xid++; + spin_unlock(&xprt->reserve_lock); + return xid; +} + +static void +xprt_init_xid(struct rpc_xprt *xprt) +{ + xprt->xid = prandom_u32(); +} + +static void +xprt_request_init(struct rpc_task *task) +{ + struct rpc_xprt *xprt = task->tk_xprt; + struct rpc_rqst *req = task->tk_rqstp; + + INIT_LIST_HEAD(&req->rq_list); + req->rq_timeout = task->tk_client->cl_timeout->to_initval; + req->rq_task = task; + req->rq_xprt = xprt; + req->rq_buffer = NULL; + req->rq_xid = xprt_alloc_xid(xprt); + req->rq_connect_cookie = xprt->connect_cookie - 1; + req->rq_bytes_sent = 0; + req->rq_snd_buf.len = 0; + req->rq_snd_buf.buflen = 0; + req->rq_rcv_buf.len = 0; + req->rq_rcv_buf.buflen = 0; + req->rq_release_snd_buf = NULL; + xprt_reset_majortimeo(req); + dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid, + req, ntohl(req->rq_xid)); +} + +static void +xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task) +{ + xprt->ops->alloc_slot(xprt, task); + if (task->tk_rqstp != NULL) + xprt_request_init(task); +} + /** * xprt_reserve - allocate an RPC request slot * @task: RPC task requesting a slot allocation @@ -1269,7 +1318,7 @@ void xprt_reserve(struct rpc_task *task) task->tk_timeout = 0; task->tk_status = -EAGAIN; if (!xprt_throttle_congested(xprt, task)) - xprt->ops->alloc_slot(xprt, task); + xprt_do_reserve(xprt, task); } /** @@ -1291,45 +1340,7 @@ void xprt_retry_reserve(struct rpc_task *task) task->tk_timeout = 0; task->tk_status = -EAGAIN; - xprt->ops->alloc_slot(xprt, task); -} - -static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt) -{ - __be32 xid; - - spin_lock(&xprt->reserve_lock); - xid = (__force __be32)xprt->xid++; - spin_unlock(&xprt->reserve_lock); - return xid; -} - -static inline void xprt_init_xid(struct rpc_xprt *xprt) -{ - xprt->xid = prandom_u32(); -} - -void xprt_request_init(struct rpc_task *task) -{ - struct rpc_xprt *xprt = task->tk_xprt; - struct rpc_rqst *req = task->tk_rqstp; - - INIT_LIST_HEAD(&req->rq_list); - req->rq_timeout = task->tk_client->cl_timeout->to_initval; - req->rq_task = task; - req->rq_xprt = xprt; - req->rq_buffer = NULL; - req->rq_xid = xprt_alloc_xid(xprt); - req->rq_connect_cookie = xprt->connect_cookie - 1; - req->rq_bytes_sent = 0; - req->rq_snd_buf.len = 0; - req->rq_snd_buf.buflen = 0; - req->rq_rcv_buf.len = 0; - req->rq_rcv_buf.buflen = 0; - req->rq_release_snd_buf = NULL; - xprt_reset_majortimeo(req); - dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid, - req, ntohl(req->rq_xid)); + xprt_do_reserve(xprt, task); } /** -- cgit v1.2.3 From 9ee94d3ed6a4f75dbf0a022927021a42a24dbdf8 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Tue, 28 Aug 2018 16:27:31 -0400 Subject: SUNRPC: If there is no reply expected, bail early from call_decode Signed-off-by: Trond Myklebust --- net/sunrpc/clnt.c | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) (limited to 'net') diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index bc9d020bf71f..4f1ec8013332 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -2260,6 +2260,11 @@ call_decode(struct rpc_task *task) dprint_status(task); + if (!decode) { + task->tk_action = rpc_exit_task; + return; + } + if (task->tk_flags & RPC_CALL_MAJORSEEN) { if (clnt->cl_chatty) { printk(KERN_NOTICE "%s: server %s OK\n", @@ -2297,13 +2302,11 @@ call_decode(struct rpc_task *task) goto out_retry; return; } - task->tk_action = rpc_exit_task; - if (decode) { - task->tk_status = rpcauth_unwrap_resp(task, decode, req, p, - task->tk_msg.rpc_resp); - } + task->tk_status = rpcauth_unwrap_resp(task, decode, req, p, + task->tk_msg.rpc_resp); + dprintk("RPC: %5u call_decode result %d\n", task->tk_pid, task->tk_status); return; -- cgit v1.2.3 From 3021a5bbbf0aa0252f2993b84ee903a0eca0b690 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Tue, 14 Aug 2018 13:50:21 -0400 Subject: SUNRPC: The transmitted message must lie in the RPCSEC window of validity If a message has been encoded using RPCSEC_GSS, the server is maintaining a window of sequence numbers that it considers valid. The client should normally be tracking that window, and needs to verify that the sequence number used by the message being transmitted still lies inside the window of validity. So far, we've been able to assume this condition would be realised automatically, since the client has been encoding the message only after taking the socket lock. Once we change that condition, we will need the explicit check. Signed-off-by: Trond Myklebust --- net/sunrpc/auth.c | 10 ++++++++++ net/sunrpc/auth_gss/auth_gss.c | 41 +++++++++++++++++++++++++++++++++++++++++ net/sunrpc/clnt.c | 3 +++ net/sunrpc/xprt.c | 7 +++++++ 4 files changed, 61 insertions(+) (limited to 'net') diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c index 305ecea92170..59df5cdba0ac 100644 --- a/net/sunrpc/auth.c +++ b/net/sunrpc/auth.c @@ -817,6 +817,16 @@ rpcauth_unwrap_resp(struct rpc_task *task, kxdrdproc_t decode, void *rqstp, return rpcauth_unwrap_req_decode(decode, rqstp, data, obj); } +bool +rpcauth_xmit_need_reencode(struct rpc_task *task) +{ + struct rpc_cred *cred = task->tk_rqstp->rq_cred; + + if (!cred || !cred->cr_ops->crneed_reencode) + return false; + return cred->cr_ops->crneed_reencode(task); +} + int rpcauth_refreshcred(struct rpc_task *task) { diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c index 21c0aa0a0d1d..c898a7c75e84 100644 --- a/net/sunrpc/auth_gss/auth_gss.c +++ b/net/sunrpc/auth_gss/auth_gss.c @@ -1984,6 +1984,46 @@ gss_unwrap_req_decode(kxdrdproc_t decode, struct rpc_rqst *rqstp, return decode(rqstp, &xdr, obj); } +static bool +gss_seq_is_newer(u32 new, u32 old) +{ + return (s32)(new - old) > 0; +} + +static bool +gss_xmit_need_reencode(struct rpc_task *task) +{ + struct rpc_rqst *req = task->tk_rqstp; + struct rpc_cred *cred = req->rq_cred; + struct gss_cl_ctx *ctx = gss_cred_get_ctx(cred); + u32 win, seq_xmit; + bool ret = true; + + if (!ctx) + return true; + + if (gss_seq_is_newer(req->rq_seqno, READ_ONCE(ctx->gc_seq))) + goto out; + + seq_xmit = READ_ONCE(ctx->gc_seq_xmit); + while (gss_seq_is_newer(req->rq_seqno, seq_xmit)) { + u32 tmp = seq_xmit; + + seq_xmit = cmpxchg(&ctx->gc_seq_xmit, tmp, req->rq_seqno); + if (seq_xmit == tmp) { + ret = false; + goto out; + } + } + + win = ctx->gc_win; + if (win > 0) + ret = !gss_seq_is_newer(req->rq_seqno, seq_xmit - win); +out: + gss_put_ctx(ctx); + return ret; +} + static int gss_unwrap_resp(struct rpc_task *task, kxdrdproc_t decode, void *rqstp, __be32 *p, void *obj) @@ -2052,6 +2092,7 @@ static const struct rpc_credops gss_credops = { .crunwrap_resp = gss_unwrap_resp, .crkey_timeout = gss_key_timeout, .crstringify_acceptor = gss_stringify_acceptor, + .crneed_reencode = gss_xmit_need_reencode, }; static const struct rpc_credops gss_nullops = { diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 4f1ec8013332..d41b5ac1d4e8 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -2184,6 +2184,9 @@ call_status(struct rpc_task *task) /* shutdown or soft timeout */ rpc_exit(task, status); break; + case -EBADMSG: + task->tk_action = call_transmit; + break; default: if (clnt->cl_chatty) printk("%s: RPC call returned error %d\n", diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 6aa09edc9567..3973e10ea2bd 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -1014,6 +1014,13 @@ void xprt_transmit(struct rpc_task *task) dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); if (!req->rq_reply_bytes_recvd) { + + /* Verify that our message lies in the RPCSEC_GSS window */ + if (!req->rq_bytes_sent && rpcauth_xmit_need_reencode(task)) { + task->tk_status = -EBADMSG; + return; + } + if (list_empty(&req->rq_list) && rpc_reply_expected(task)) { /* * Add to the list only if we're expecting a reply -- cgit v1.2.3 From 7ebbbc6e7bd023903daa5bd95726edf2d60b559c Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Tue, 28 Aug 2018 09:00:27 -0400 Subject: SUNRPC: Simplify identification of when the message send/receive is complete Add states to indicate that the message send and receive are not yet complete. Signed-off-by: Trond Myklebust --- net/sunrpc/clnt.c | 19 +++++++------------ net/sunrpc/xprt.c | 17 ++++++++++++++--- 2 files changed, 21 insertions(+), 15 deletions(-) (limited to 'net') diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index d41b5ac1d4e8..e5ac35e803ad 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -1156,6 +1156,7 @@ struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req) */ xbufp->len = xbufp->head[0].iov_len + xbufp->page_len + xbufp->tail[0].iov_len; + set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); task->tk_action = call_bc_transmit; atomic_inc(&task->tk_count); @@ -1720,17 +1721,10 @@ call_allocate(struct rpc_task *task) rpc_exit(task, -ERESTARTSYS); } -static inline int +static int rpc_task_need_encode(struct rpc_task *task) { - return task->tk_rqstp->rq_snd_buf.len == 0; -} - -static inline void -rpc_task_force_reencode(struct rpc_task *task) -{ - task->tk_rqstp->rq_snd_buf.len = 0; - task->tk_rqstp->rq_bytes_sent = 0; + return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0; } /* @@ -1765,6 +1759,8 @@ rpc_xdr_encode(struct rpc_task *task) task->tk_status = rpcauth_wrap_req(task, encode, req, p, task->tk_msg.rpc_argp); + if (task->tk_status == 0) + set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); } /* @@ -1999,7 +1995,6 @@ call_transmit_status(struct rpc_task *task) */ if (task->tk_status == 0) { xprt_end_transmit(task); - rpc_task_force_reencode(task); return; } @@ -2010,7 +2005,6 @@ call_transmit_status(struct rpc_task *task) default: dprint_status(task); xprt_end_transmit(task); - rpc_task_force_reencode(task); break; /* * Special cases: if we've been waiting on the @@ -2038,7 +2032,7 @@ call_transmit_status(struct rpc_task *task) case -EADDRINUSE: case -ENOTCONN: case -EPIPE: - rpc_task_force_reencode(task); + break; } } @@ -2185,6 +2179,7 @@ call_status(struct rpc_task *task) rpc_exit(task, status); break; case -EBADMSG: + clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); task->tk_action = call_transmit; break; default: diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 3973e10ea2bd..45d580cd93ac 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -936,10 +936,18 @@ void xprt_complete_rqst(struct rpc_task *task, int copied) /* req->rq_reply_bytes_recvd */ smp_wmb(); req->rq_reply_bytes_recvd = copied; + clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); rpc_wake_up_queued_task(&xprt->pending, task); } EXPORT_SYMBOL_GPL(xprt_complete_rqst); +static bool +xprt_request_data_received(struct rpc_task *task) +{ + return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && + task->tk_rqstp->rq_reply_bytes_recvd != 0; +} + static void xprt_timer(struct rpc_task *task) { struct rpc_rqst *req = task->tk_rqstp; @@ -1031,12 +1039,13 @@ void xprt_transmit(struct rpc_task *task) /* Add request to the receive list */ spin_lock(&xprt->recv_lock); list_add_tail(&req->rq_list, &xprt->recv); + set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); spin_unlock(&xprt->recv_lock); xprt_reset_majortimeo(req); /* Turn off autodisconnect */ del_singleshot_timer_sync(&xprt->timer); } - } else if (!req->rq_bytes_sent) + } else if (xprt_request_data_received(task) && !req->rq_bytes_sent) return; connect_cookie = xprt->connect_cookie; @@ -1046,9 +1055,11 @@ void xprt_transmit(struct rpc_task *task) task->tk_status = status; return; } + xprt_inject_disconnect(xprt); dprintk("RPC: %5u xmit complete\n", task->tk_pid); + clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); task->tk_flags |= RPC_TASK_SENT; spin_lock_bh(&xprt->transport_lock); @@ -1062,14 +1073,14 @@ void xprt_transmit(struct rpc_task *task) spin_unlock_bh(&xprt->transport_lock); req->rq_connect_cookie = connect_cookie; - if (rpc_reply_expected(task) && !READ_ONCE(req->rq_reply_bytes_recvd)) { + if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) { /* * Sleep on the pending queue if we're expecting a reply. * The spinlock ensures atomicity between the test of * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on(). */ spin_lock(&xprt->recv_lock); - if (!req->rq_reply_bytes_recvd) { + if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) { rpc_sleep_on(&xprt->pending, task, xprt_timer); /* * Send an extra queue wakeup call if the -- cgit v1.2.3 From 3a03818fbee0a85196a2214f07db9e2b622e2550 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Tue, 14 Aug 2018 13:58:34 -0400 Subject: SUNRPC: Avoid holding locks across the XDR encoding of the RPC message Currently, we grab the socket bit lock before we allow the message to be XDR encoded. That significantly slows down the transmission rate, since we serialise on a potentially blocking operation. Signed-off-by: Trond Myklebust --- net/sunrpc/clnt.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'net') diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index e5ac35e803ad..a858366cd15d 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -1949,9 +1949,6 @@ call_transmit(struct rpc_task *task) task->tk_action = call_status; if (task->tk_status < 0) return; - if (!xprt_prepare_transmit(task)) - return; - task->tk_action = call_transmit_status; /* Encode here so that rpcsec_gss can use correct sequence number. */ if (rpc_task_need_encode(task)) { rpc_xdr_encode(task); @@ -1965,6 +1962,9 @@ call_transmit(struct rpc_task *task) return; } } + if (!xprt_prepare_transmit(task)) + return; + task->tk_action = call_transmit_status; xprt_transmit(task); if (task->tk_status < 0) return; -- cgit v1.2.3 From d1109aa56c71e19fc117e75bff11384fc7279a3b Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Mon, 13 Aug 2018 15:48:42 -0400 Subject: SUNRPC: Rename TCP receive-specific state variables Since we will want to introduce similar TCP state variables for the transmission of requests, let's rename the existing ones to label that they are for the receive side. Signed-off-by: Trond Myklebust --- net/sunrpc/xprtsock.c | 178 +++++++++++++++++++++++++------------------------- 1 file changed, 89 insertions(+), 89 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 6b7539c0466e..cd7d093721ae 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -1169,42 +1169,42 @@ static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_rea size_t len, used; char *p; - p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset; - len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset; + p = ((char *) &transport->recv.fraghdr) + transport->recv.offset; + len = sizeof(transport->recv.fraghdr) - transport->recv.offset; used = xdr_skb_read_bits(desc, p, len); - transport->tcp_offset += used; + transport->recv.offset += used; if (used != len) return; - transport->tcp_reclen = ntohl(transport->tcp_fraghdr); - if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT) - transport->tcp_flags |= TCP_RCV_LAST_FRAG; + transport->recv.len = ntohl(transport->recv.fraghdr); + if (transport->recv.len & RPC_LAST_STREAM_FRAGMENT) + transport->recv.flags |= TCP_RCV_LAST_FRAG; else - transport->tcp_flags &= ~TCP_RCV_LAST_FRAG; - transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK; + transport->recv.flags &= ~TCP_RCV_LAST_FRAG; + transport->recv.len &= RPC_FRAGMENT_SIZE_MASK; - transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR; - transport->tcp_offset = 0; + transport->recv.flags &= ~TCP_RCV_COPY_FRAGHDR; + transport->recv.offset = 0; /* Sanity check of the record length */ - if (unlikely(transport->tcp_reclen < 8)) { + if (unlikely(transport->recv.len < 8)) { dprintk("RPC: invalid TCP record fragment length\n"); xs_tcp_force_close(xprt); return; } dprintk("RPC: reading TCP record fragment of length %d\n", - transport->tcp_reclen); + transport->recv.len); } static void xs_tcp_check_fraghdr(struct sock_xprt *transport) { - if (transport->tcp_offset == transport->tcp_reclen) { - transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR; - transport->tcp_offset = 0; - if (transport->tcp_flags & TCP_RCV_LAST_FRAG) { - transport->tcp_flags &= ~TCP_RCV_COPY_DATA; - transport->tcp_flags |= TCP_RCV_COPY_XID; - transport->tcp_copied = 0; + if (transport->recv.offset == transport->recv.len) { + transport->recv.flags |= TCP_RCV_COPY_FRAGHDR; + transport->recv.offset = 0; + if (transport->recv.flags & TCP_RCV_LAST_FRAG) { + transport->recv.flags &= ~TCP_RCV_COPY_DATA; + transport->recv.flags |= TCP_RCV_COPY_XID; + transport->recv.copied = 0; } } } @@ -1214,20 +1214,20 @@ static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_r size_t len, used; char *p; - len = sizeof(transport->tcp_xid) - transport->tcp_offset; + len = sizeof(transport->recv.xid) - transport->recv.offset; dprintk("RPC: reading XID (%zu bytes)\n", len); - p = ((char *) &transport->tcp_xid) + transport->tcp_offset; + p = ((char *) &transport->recv.xid) + transport->recv.offset; used = xdr_skb_read_bits(desc, p, len); - transport->tcp_offset += used; + transport->recv.offset += used; if (used != len) return; - transport->tcp_flags &= ~TCP_RCV_COPY_XID; - transport->tcp_flags |= TCP_RCV_READ_CALLDIR; - transport->tcp_copied = 4; + transport->recv.flags &= ~TCP_RCV_COPY_XID; + transport->recv.flags |= TCP_RCV_READ_CALLDIR; + transport->recv.copied = 4; dprintk("RPC: reading %s XID %08x\n", - (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for" + (transport->recv.flags & TCP_RPC_REPLY) ? "reply for" : "request with", - ntohl(transport->tcp_xid)); + ntohl(transport->recv.xid)); xs_tcp_check_fraghdr(transport); } @@ -1239,34 +1239,34 @@ static inline void xs_tcp_read_calldir(struct sock_xprt *transport, char *p; /* - * We want transport->tcp_offset to be 8 at the end of this routine + * We want transport->recv.offset to be 8 at the end of this routine * (4 bytes for the xid and 4 bytes for the call/reply flag). * When this function is called for the first time, - * transport->tcp_offset is 4 (after having already read the xid). + * transport->recv.offset is 4 (after having already read the xid). */ - offset = transport->tcp_offset - sizeof(transport->tcp_xid); - len = sizeof(transport->tcp_calldir) - offset; + offset = transport->recv.offset - sizeof(transport->recv.xid); + len = sizeof(transport->recv.calldir) - offset; dprintk("RPC: reading CALL/REPLY flag (%zu bytes)\n", len); - p = ((char *) &transport->tcp_calldir) + offset; + p = ((char *) &transport->recv.calldir) + offset; used = xdr_skb_read_bits(desc, p, len); - transport->tcp_offset += used; + transport->recv.offset += used; if (used != len) return; - transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR; + transport->recv.flags &= ~TCP_RCV_READ_CALLDIR; /* * We don't yet have the XDR buffer, so we will write the calldir * out after we get the buffer from the 'struct rpc_rqst' */ - switch (ntohl(transport->tcp_calldir)) { + switch (ntohl(transport->recv.calldir)) { case RPC_REPLY: - transport->tcp_flags |= TCP_RCV_COPY_CALLDIR; - transport->tcp_flags |= TCP_RCV_COPY_DATA; - transport->tcp_flags |= TCP_RPC_REPLY; + transport->recv.flags |= TCP_RCV_COPY_CALLDIR; + transport->recv.flags |= TCP_RCV_COPY_DATA; + transport->recv.flags |= TCP_RPC_REPLY; break; case RPC_CALL: - transport->tcp_flags |= TCP_RCV_COPY_CALLDIR; - transport->tcp_flags |= TCP_RCV_COPY_DATA; - transport->tcp_flags &= ~TCP_RPC_REPLY; + transport->recv.flags |= TCP_RCV_COPY_CALLDIR; + transport->recv.flags |= TCP_RCV_COPY_DATA; + transport->recv.flags &= ~TCP_RPC_REPLY; break; default: dprintk("RPC: invalid request message type\n"); @@ -1287,21 +1287,21 @@ static inline void xs_tcp_read_common(struct rpc_xprt *xprt, rcvbuf = &req->rq_private_buf; - if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) { + if (transport->recv.flags & TCP_RCV_COPY_CALLDIR) { /* * Save the RPC direction in the XDR buffer */ - memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied, - &transport->tcp_calldir, - sizeof(transport->tcp_calldir)); - transport->tcp_copied += sizeof(transport->tcp_calldir); - transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR; + memcpy(rcvbuf->head[0].iov_base + transport->recv.copied, + &transport->recv.calldir, + sizeof(transport->recv.calldir)); + transport->recv.copied += sizeof(transport->recv.calldir); + transport->recv.flags &= ~TCP_RCV_COPY_CALLDIR; } len = desc->count; - if (len > transport->tcp_reclen - transport->tcp_offset) - desc->count = transport->tcp_reclen - transport->tcp_offset; - r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied, + if (len > transport->recv.len - transport->recv.offset) + desc->count = transport->recv.len - transport->recv.offset; + r = xdr_partial_copy_from_skb(rcvbuf, transport->recv.copied, desc, xdr_skb_read_bits); if (desc->count) { @@ -1314,31 +1314,31 @@ static inline void xs_tcp_read_common(struct rpc_xprt *xprt, * Any remaining data from this record will * be discarded. */ - transport->tcp_flags &= ~TCP_RCV_COPY_DATA; + transport->recv.flags &= ~TCP_RCV_COPY_DATA; dprintk("RPC: XID %08x truncated request\n", - ntohl(transport->tcp_xid)); - dprintk("RPC: xprt = %p, tcp_copied = %lu, " - "tcp_offset = %u, tcp_reclen = %u\n", - xprt, transport->tcp_copied, - transport->tcp_offset, transport->tcp_reclen); + ntohl(transport->recv.xid)); + dprintk("RPC: xprt = %p, recv.copied = %lu, " + "recv.offset = %u, recv.len = %u\n", + xprt, transport->recv.copied, + transport->recv.offset, transport->recv.len); return; } - transport->tcp_copied += r; - transport->tcp_offset += r; + transport->recv.copied += r; + transport->recv.offset += r; desc->count = len - r; dprintk("RPC: XID %08x read %zd bytes\n", - ntohl(transport->tcp_xid), r); - dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, " - "tcp_reclen = %u\n", xprt, transport->tcp_copied, - transport->tcp_offset, transport->tcp_reclen); - - if (transport->tcp_copied == req->rq_private_buf.buflen) - transport->tcp_flags &= ~TCP_RCV_COPY_DATA; - else if (transport->tcp_offset == transport->tcp_reclen) { - if (transport->tcp_flags & TCP_RCV_LAST_FRAG) - transport->tcp_flags &= ~TCP_RCV_COPY_DATA; + ntohl(transport->recv.xid), r); + dprintk("RPC: xprt = %p, recv.copied = %lu, recv.offset = %u, " + "recv.len = %u\n", xprt, transport->recv.copied, + transport->recv.offset, transport->recv.len); + + if (transport->recv.copied == req->rq_private_buf.buflen) + transport->recv.flags &= ~TCP_RCV_COPY_DATA; + else if (transport->recv.offset == transport->recv.len) { + if (transport->recv.flags & TCP_RCV_LAST_FRAG) + transport->recv.flags &= ~TCP_RCV_COPY_DATA; } } @@ -1353,14 +1353,14 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, container_of(xprt, struct sock_xprt, xprt); struct rpc_rqst *req; - dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid)); + dprintk("RPC: read reply XID %08x\n", ntohl(transport->recv.xid)); /* Find and lock the request corresponding to this xid */ spin_lock(&xprt->recv_lock); - req = xprt_lookup_rqst(xprt, transport->tcp_xid); + req = xprt_lookup_rqst(xprt, transport->recv.xid); if (!req) { dprintk("RPC: XID %08x request not found!\n", - ntohl(transport->tcp_xid)); + ntohl(transport->recv.xid)); spin_unlock(&xprt->recv_lock); return -1; } @@ -1370,8 +1370,8 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, xs_tcp_read_common(xprt, desc, req); spin_lock(&xprt->recv_lock); - if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) - xprt_complete_rqst(req->rq_task, transport->tcp_copied); + if (!(transport->recv.flags & TCP_RCV_COPY_DATA)) + xprt_complete_rqst(req->rq_task, transport->recv.copied); xprt_unpin_rqst(req); spin_unlock(&xprt->recv_lock); return 0; @@ -1393,7 +1393,7 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt, struct rpc_rqst *req; /* Look up the request corresponding to the given XID */ - req = xprt_lookup_bc_request(xprt, transport->tcp_xid); + req = xprt_lookup_bc_request(xprt, transport->recv.xid); if (req == NULL) { printk(KERN_WARNING "Callback slot table overflowed\n"); xprt_force_disconnect(xprt); @@ -1403,8 +1403,8 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt, dprintk("RPC: read callback XID %08x\n", ntohl(req->rq_xid)); xs_tcp_read_common(xprt, desc, req); - if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) - xprt_complete_bc_request(req, transport->tcp_copied); + if (!(transport->recv.flags & TCP_RCV_COPY_DATA)) + xprt_complete_bc_request(req, transport->recv.copied); return 0; } @@ -1415,7 +1415,7 @@ static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - return (transport->tcp_flags & TCP_RPC_REPLY) ? + return (transport->recv.flags & TCP_RPC_REPLY) ? xs_tcp_read_reply(xprt, desc) : xs_tcp_read_callback(xprt, desc); } @@ -1458,9 +1458,9 @@ static void xs_tcp_read_data(struct rpc_xprt *xprt, else { /* * The transport_lock protects the request handling. - * There's no need to hold it to update the tcp_flags. + * There's no need to hold it to update the recv.flags. */ - transport->tcp_flags &= ~TCP_RCV_COPY_DATA; + transport->recv.flags &= ~TCP_RCV_COPY_DATA; } } @@ -1468,12 +1468,12 @@ static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_s { size_t len; - len = transport->tcp_reclen - transport->tcp_offset; + len = transport->recv.len - transport->recv.offset; if (len > desc->count) len = desc->count; desc->count -= len; desc->offset += len; - transport->tcp_offset += len; + transport->recv.offset += len; dprintk("RPC: discarded %zu bytes\n", len); xs_tcp_check_fraghdr(transport); } @@ -1494,22 +1494,22 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns trace_xs_tcp_data_recv(transport); /* Read in a new fragment marker if necessary */ /* Can we ever really expect to get completely empty fragments? */ - if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) { + if (transport->recv.flags & TCP_RCV_COPY_FRAGHDR) { xs_tcp_read_fraghdr(xprt, &desc); continue; } /* Read in the xid if necessary */ - if (transport->tcp_flags & TCP_RCV_COPY_XID) { + if (transport->recv.flags & TCP_RCV_COPY_XID) { xs_tcp_read_xid(transport, &desc); continue; } /* Read in the call/reply flag */ - if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) { + if (transport->recv.flags & TCP_RCV_READ_CALLDIR) { xs_tcp_read_calldir(transport, &desc); continue; } /* Read in the request data */ - if (transport->tcp_flags & TCP_RCV_COPY_DATA) { + if (transport->recv.flags & TCP_RCV_COPY_DATA) { xs_tcp_read_data(xprt, &desc); continue; } @@ -1602,10 +1602,10 @@ static void xs_tcp_state_change(struct sock *sk) if (!xprt_test_and_set_connected(xprt)) { /* Reset TCP record info */ - transport->tcp_offset = 0; - transport->tcp_reclen = 0; - transport->tcp_copied = 0; - transport->tcp_flags = + transport->recv.offset = 0; + transport->recv.len = 0; + transport->recv.copied = 0; + transport->recv.flags = TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID; xprt->connect_cookie++; clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); -- cgit v1.2.3 From e1806c7bfb803408df4dc53dfe502ffab0f46a67 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Mon, 13 Aug 2018 16:50:49 -0400 Subject: SUNRPC: Move reset of TCP state variables into the reconnect code Rather than resetting state variables in socket state_change() callback, do it in the sunrpc TCP connect function itself. Signed-off-by: Trond Myklebust --- net/sunrpc/xprtsock.c | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index cd7d093721ae..ec1e3f93e707 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -1600,13 +1600,6 @@ static void xs_tcp_state_change(struct sock *sk) case TCP_ESTABLISHED: spin_lock(&xprt->transport_lock); if (!xprt_test_and_set_connected(xprt)) { - - /* Reset TCP record info */ - transport->recv.offset = 0; - transport->recv.len = 0; - transport->recv.copied = 0; - transport->recv.flags = - TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID; xprt->connect_cookie++; clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); xprt_clear_connecting(xprt); @@ -2386,6 +2379,12 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) xs_set_memalloc(xprt); + /* Reset TCP record info */ + transport->recv.offset = 0; + transport->recv.len = 0; + transport->recv.copied = 0; + transport->recv.flags = TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID; + /* Tell the socket layer to start connecting... */ xprt->stat.connect_count++; xprt->stat.connect_start = jiffies; -- cgit v1.2.3 From 6c7a64e5a44dbc6d073b83a56a48d0a4099f1dd2 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Mon, 13 Aug 2018 16:54:57 -0400 Subject: SUNRPC: Add socket transmit queue offset tracking Signed-off-by: Trond Myklebust --- net/sunrpc/xprtsock.c | 40 ++++++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 18 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index ec1e3f93e707..629cc45e1e6c 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -461,7 +461,7 @@ static int xs_nospace(struct rpc_task *task) int ret = -EAGAIN; dprintk("RPC: %5u xmit incomplete (%u left of %u)\n", - task->tk_pid, req->rq_slen - req->rq_bytes_sent, + task->tk_pid, req->rq_slen - transport->xmit.offset, req->rq_slen); /* Protect against races with write_space */ @@ -528,19 +528,22 @@ static int xs_local_send_request(struct rpc_task *task) req->rq_svec->iov_base, req->rq_svec->iov_len); req->rq_xtime = ktime_get(); - status = xs_sendpages(transport->sock, NULL, 0, xdr, req->rq_bytes_sent, + status = xs_sendpages(transport->sock, NULL, 0, xdr, + transport->xmit.offset, true, &sent); dprintk("RPC: %s(%u) = %d\n", - __func__, xdr->len - req->rq_bytes_sent, status); + __func__, xdr->len - transport->xmit.offset, status); if (status == -EAGAIN && sock_writeable(transport->inet)) status = -ENOBUFS; if (likely(sent > 0) || status == 0) { - req->rq_bytes_sent += sent; - req->rq_xmit_bytes_sent += sent; + transport->xmit.offset += sent; + req->rq_bytes_sent = transport->xmit.offset; if (likely(req->rq_bytes_sent >= req->rq_slen)) { + req->rq_xmit_bytes_sent += transport->xmit.offset; req->rq_bytes_sent = 0; + transport->xmit.offset = 0; return 0; } status = -EAGAIN; @@ -592,10 +595,10 @@ static int xs_udp_send_request(struct rpc_task *task) return -ENOTCONN; req->rq_xtime = ktime_get(); status = xs_sendpages(transport->sock, xs_addr(xprt), xprt->addrlen, - xdr, req->rq_bytes_sent, true, &sent); + xdr, 0, true, &sent); dprintk("RPC: xs_udp_send_request(%u) = %d\n", - xdr->len - req->rq_bytes_sent, status); + xdr->len, status); /* firewall is blocking us, don't return -EAGAIN or we end up looping */ if (status == -EPERM) @@ -684,17 +687,20 @@ static int xs_tcp_send_request(struct rpc_task *task) while (1) { sent = 0; status = xs_sendpages(transport->sock, NULL, 0, xdr, - req->rq_bytes_sent, zerocopy, &sent); + transport->xmit.offset, + zerocopy, &sent); dprintk("RPC: xs_tcp_send_request(%u) = %d\n", - xdr->len - req->rq_bytes_sent, status); + xdr->len - transport->xmit.offset, status); /* If we've sent the entire packet, immediately * reset the count of bytes sent. */ - req->rq_bytes_sent += sent; - req->rq_xmit_bytes_sent += sent; + transport->xmit.offset += sent; + req->rq_bytes_sent = transport->xmit.offset; if (likely(req->rq_bytes_sent >= req->rq_slen)) { + req->rq_xmit_bytes_sent += transport->xmit.offset; req->rq_bytes_sent = 0; + transport->xmit.offset = 0; return 0; } @@ -760,18 +766,13 @@ static int xs_tcp_send_request(struct rpc_task *task) */ static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) { - struct rpc_rqst *req; + struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); if (task != xprt->snd_task) return; if (task == NULL) goto out_release; - req = task->tk_rqstp; - if (req == NULL) - goto out_release; - if (req->rq_bytes_sent == 0) - goto out_release; - if (req->rq_bytes_sent == req->rq_snd_buf.len) + if (transport->xmit.offset == 0 || !xprt_connected(xprt)) goto out_release; set_bit(XPRT_CLOSE_WAIT, &xprt->state); out_release: @@ -2021,6 +2022,8 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt, write_unlock_bh(&sk->sk_callback_lock); } + transport->xmit.offset = 0; + /* Tell the socket layer to start connecting... */ xprt->stat.connect_count++; xprt->stat.connect_start = jiffies; @@ -2384,6 +2387,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) transport->recv.len = 0; transport->recv.copied = 0; transport->recv.flags = TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID; + transport->xmit.offset = 0; /* Tell the socket layer to start connecting... */ xprt->stat.connect_count++; -- cgit v1.2.3 From 4cd34e7c2e412e3db2f6bf7371581ab60591174b Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Fri, 31 Aug 2018 10:00:02 -0400 Subject: SUNRPC: Simplify dealing with aborted partially transmitted messages If the previous message was only partially transmitted, we need to close the socket in order to avoid corruption of the message stream. To do so, we currently hijack the unlocking of the socket in order to schedule the close. Now that we track the message offset in the socket state, we can move that kind of checking out of the socket lock code, which is needed to allow messages to remain queued after dropping the socket lock. Signed-off-by: Trond Myklebust --- net/sunrpc/xprtsock.c | 51 +++++++++++++++++++++++++-------------------------- 1 file changed, 25 insertions(+), 26 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 629cc45e1e6c..3fbccebd0b10 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -491,6 +491,16 @@ static int xs_nospace(struct rpc_task *task) return ret; } +/* + * Determine if the previous message in the stream was aborted before it + * could complete transmission. + */ +static bool +xs_send_request_was_aborted(struct sock_xprt *transport, struct rpc_rqst *req) +{ + return transport->xmit.offset != 0 && req->rq_bytes_sent == 0; +} + /* * Construct a stream transport record marker in @buf. */ @@ -522,6 +532,12 @@ static int xs_local_send_request(struct rpc_task *task) int status; int sent = 0; + /* Close the stream if the previous transmission was incomplete */ + if (xs_send_request_was_aborted(transport, req)) { + xs_close(xprt); + return -ENOTCONN; + } + xs_encode_stream_record_marker(&req->rq_snd_buf); xs_pktdump("packet data:", @@ -665,6 +681,13 @@ static int xs_tcp_send_request(struct rpc_task *task) int status; int sent; + /* Close the stream if the previous transmission was incomplete */ + if (xs_send_request_was_aborted(transport, req)) { + if (transport->sock != NULL) + kernel_sock_shutdown(transport->sock, SHUT_RDWR); + return -ENOTCONN; + } + xs_encode_stream_record_marker(&req->rq_snd_buf); xs_pktdump("packet data:", @@ -755,30 +778,6 @@ static int xs_tcp_send_request(struct rpc_task *task) return status; } -/** - * xs_tcp_release_xprt - clean up after a tcp transmission - * @xprt: transport - * @task: rpc task - * - * This cleans up if an error causes us to abort the transmission of a request. - * In this case, the socket may need to be reset in order to avoid confusing - * the server. - */ -static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) -{ - struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - - if (task != xprt->snd_task) - return; - if (task == NULL) - goto out_release; - if (transport->xmit.offset == 0 || !xprt_connected(xprt)) - goto out_release; - set_bit(XPRT_CLOSE_WAIT, &xprt->state); -out_release: - xprt_release_xprt(xprt, task); -} - static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk) { transport->old_data_ready = sk->sk_data_ready; @@ -2764,7 +2763,7 @@ static void bc_destroy(struct rpc_xprt *xprt) static const struct rpc_xprt_ops xs_local_ops = { .reserve_xprt = xprt_reserve_xprt, - .release_xprt = xs_tcp_release_xprt, + .release_xprt = xprt_release_xprt, .alloc_slot = xprt_alloc_slot, .free_slot = xprt_free_slot, .rpcbind = xs_local_rpcbind, @@ -2806,7 +2805,7 @@ static const struct rpc_xprt_ops xs_udp_ops = { static const struct rpc_xprt_ops xs_tcp_ops = { .reserve_xprt = xprt_reserve_xprt, - .release_xprt = xs_tcp_release_xprt, + .release_xprt = xprt_release_xprt, .alloc_slot = xprt_lock_and_alloc_slot, .free_slot = xprt_free_slot, .rpcbind = rpcb_getport_async, -- cgit v1.2.3 From cf9946cd6144410ced00d52586ff5a2cb4868fc5 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Mon, 6 Aug 2018 12:55:34 -0400 Subject: SUNRPC: Refactor the transport request pinning We are going to need to pin for both send and receive. Signed-off-by: Trond Myklebust --- net/sunrpc/xprt.c | 43 +++++++++++++++++++++++-------------------- 1 file changed, 23 insertions(+), 20 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 45d580cd93ac..649a40cfae6d 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -847,16 +847,22 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) } EXPORT_SYMBOL_GPL(xprt_lookup_rqst); +static bool +xprt_is_pinned_rqst(struct rpc_rqst *req) +{ + return atomic_read(&req->rq_pin) != 0; +} + /** * xprt_pin_rqst - Pin a request on the transport receive list * @req: Request to pin * * Caller must ensure this is atomic with the call to xprt_lookup_rqst() - * so should be holding the xprt transport lock. + * so should be holding the xprt receive lock. */ void xprt_pin_rqst(struct rpc_rqst *req) { - set_bit(RPC_TASK_MSG_RECV, &req->rq_task->tk_runstate); + atomic_inc(&req->rq_pin); } EXPORT_SYMBOL_GPL(xprt_pin_rqst); @@ -864,31 +870,22 @@ EXPORT_SYMBOL_GPL(xprt_pin_rqst); * xprt_unpin_rqst - Unpin a request on the transport receive list * @req: Request to pin * - * Caller should be holding the xprt transport lock. + * Caller should be holding the xprt receive lock. */ void xprt_unpin_rqst(struct rpc_rqst *req) { - struct rpc_task *task = req->rq_task; - - clear_bit(RPC_TASK_MSG_RECV, &task->tk_runstate); - if (test_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate)) - wake_up_bit(&task->tk_runstate, RPC_TASK_MSG_RECV); + if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) { + atomic_dec(&req->rq_pin); + return; + } + if (atomic_dec_and_test(&req->rq_pin)) + wake_up_var(&req->rq_pin); } EXPORT_SYMBOL_GPL(xprt_unpin_rqst); static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) -__must_hold(&req->rq_xprt->recv_lock) { - struct rpc_task *task = req->rq_task; - - if (task && test_bit(RPC_TASK_MSG_RECV, &task->tk_runstate)) { - spin_unlock(&req->rq_xprt->recv_lock); - set_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate); - wait_on_bit(&task->tk_runstate, RPC_TASK_MSG_RECV, - TASK_UNINTERRUPTIBLE); - clear_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate); - spin_lock(&req->rq_xprt->recv_lock); - } + wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req)); } /** @@ -1388,7 +1385,13 @@ void xprt_release(struct rpc_task *task) spin_lock(&xprt->recv_lock); if (!list_empty(&req->rq_list)) { list_del_init(&req->rq_list); - xprt_wait_on_pinned_rqst(req); + if (xprt_is_pinned_rqst(req)) { + set_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate); + spin_unlock(&xprt->recv_lock); + xprt_wait_on_pinned_rqst(req); + spin_lock(&xprt->recv_lock); + clear_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate); + } } spin_unlock(&xprt->recv_lock); spin_lock_bh(&xprt->transport_lock); -- cgit v1.2.3 From 359c48c04af25397ecefec1ccf200ddd199617ce Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Wed, 29 Aug 2018 09:22:28 -0400 Subject: SUNRPC: Add a helper to wake up a sleeping rpc_task and set its status Add a helper that will wake up a task that is sleeping on a specific queue, and will set the value of task->tk_status. This is mainly intended for use by the transport layer to notify the task of an error condition. Signed-off-by: Trond Myklebust --- net/sunrpc/sched.c | 65 +++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 55 insertions(+), 10 deletions(-) (limited to 'net') diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index 3fe5d60ab0e2..dec01bd1b71c 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -440,14 +440,28 @@ static void __rpc_do_wake_up_task_on_wq(struct workqueue_struct *wq, /* * Wake up a queued task while the queue lock is being held */ -static void rpc_wake_up_task_on_wq_queue_locked(struct workqueue_struct *wq, - struct rpc_wait_queue *queue, struct rpc_task *task) +static struct rpc_task * +rpc_wake_up_task_on_wq_queue_action_locked(struct workqueue_struct *wq, + struct rpc_wait_queue *queue, struct rpc_task *task, + bool (*action)(struct rpc_task *, void *), void *data) { if (RPC_IS_QUEUED(task)) { smp_rmb(); - if (task->tk_waitqueue == queue) - __rpc_do_wake_up_task_on_wq(wq, queue, task); + if (task->tk_waitqueue == queue) { + if (action == NULL || action(task, data)) { + __rpc_do_wake_up_task_on_wq(wq, queue, task); + return task; + } + } } + return NULL; +} + +static void +rpc_wake_up_task_on_wq_queue_locked(struct workqueue_struct *wq, + struct rpc_wait_queue *queue, struct rpc_task *task) +{ + rpc_wake_up_task_on_wq_queue_action_locked(wq, queue, task, NULL, NULL); } /* @@ -481,6 +495,40 @@ void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task } EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task); +static bool rpc_task_action_set_status(struct rpc_task *task, void *status) +{ + task->tk_status = *(int *)status; + return true; +} + +static void +rpc_wake_up_task_queue_set_status_locked(struct rpc_wait_queue *queue, + struct rpc_task *task, int status) +{ + rpc_wake_up_task_on_wq_queue_action_locked(rpciod_workqueue, queue, + task, rpc_task_action_set_status, &status); +} + +/** + * rpc_wake_up_queued_task_set_status - wake up a task and set task->tk_status + * @queue: pointer to rpc_wait_queue + * @task: pointer to rpc_task + * @status: integer error value + * + * If @task is queued on @queue, then it is woken up, and @task->tk_status is + * set to the value of @status. + */ +void +rpc_wake_up_queued_task_set_status(struct rpc_wait_queue *queue, + struct rpc_task *task, int status) +{ + if (!RPC_IS_QUEUED(task)) + return; + spin_lock_bh(&queue->lock); + rpc_wake_up_task_queue_set_status_locked(queue, task, status); + spin_unlock_bh(&queue->lock); +} + /* * Wake up the next task on a priority queue. */ @@ -553,12 +601,9 @@ struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq, queue, rpc_qname(queue)); spin_lock_bh(&queue->lock); task = __rpc_find_next_queued(queue); - if (task != NULL) { - if (func(task, data)) - rpc_wake_up_task_on_wq_queue_locked(wq, queue, task); - else - task = NULL; - } + if (task != NULL) + task = rpc_wake_up_task_on_wq_queue_action_locked(wq, queue, + task, func, data); spin_unlock_bh(&queue->lock); return task; -- cgit v1.2.3 From 5ce970393bad41499d50dfaea525ac8f01cdbc30 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Fri, 7 Sep 2018 23:15:35 -0400 Subject: SUNRPC: Test whether the task is queued before grabbing the queue spinlocks When asked to wake up an RPC task, it makes sense to test whether or not the task is still queued. Signed-off-by: Trond Myklebust --- net/sunrpc/sched.c | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'net') diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index dec01bd1b71c..9a8ec012b449 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -479,6 +479,8 @@ void rpc_wake_up_queued_task_on_wq(struct workqueue_struct *wq, struct rpc_wait_queue *queue, struct rpc_task *task) { + if (!RPC_IS_QUEUED(task)) + return; spin_lock_bh(&queue->lock); rpc_wake_up_task_on_wq_queue_locked(wq, queue, task); spin_unlock_bh(&queue->lock); @@ -489,6 +491,8 @@ void rpc_wake_up_queued_task_on_wq(struct workqueue_struct *wq, */ void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task) { + if (!RPC_IS_QUEUED(task)) + return; spin_lock_bh(&queue->lock); rpc_wake_up_task_queue_locked(queue, task); spin_unlock_bh(&queue->lock); -- cgit v1.2.3 From ec37a58fba289d53f35442ad0ef3b469412efd20 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Wed, 29 Aug 2018 09:30:19 -0400 Subject: SUNRPC: Don't wake queued RPC calls multiple times in xprt_transmit Rather than waking up the entire queue of RPC messages a second time, just wake up the task that was put to sleep. Signed-off-by: Trond Myklebust --- net/sunrpc/xprt.c | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 649a40cfae6d..3a3b3445a7c0 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -1079,13 +1079,10 @@ void xprt_transmit(struct rpc_task *task) spin_lock(&xprt->recv_lock); if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) { rpc_sleep_on(&xprt->pending, task, xprt_timer); - /* - * Send an extra queue wakeup call if the - * connection was dropped in case the call to - * rpc_sleep_on() raced. - */ + /* Wake up immediately if the connection was dropped */ if (!xprt_connected(xprt)) - xprt_wake_pending_tasks(xprt, -ENOTCONN); + rpc_wake_up_queued_task_set_status(&xprt->pending, + task, -ENOTCONN); } spin_unlock(&xprt->recv_lock); } -- cgit v1.2.3 From 75c84151a9dc7a755c607e6761d8f14a1690dbf0 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Fri, 31 Aug 2018 10:21:00 -0400 Subject: SUNRPC: Rename xprt->recv_lock to xprt->queue_lock We will use the same lock to protect both the transmit and receive queues. Signed-off-by: Trond Myklebust --- net/sunrpc/svcsock.c | 6 +++--- net/sunrpc/xprt.c | 24 ++++++++++++------------ net/sunrpc/xprtrdma/rpc_rdma.c | 10 +++++----- net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 4 ++-- net/sunrpc/xprtsock.c | 30 +++++++++++++++--------------- 5 files changed, 37 insertions(+), 37 deletions(-) (limited to 'net') diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c index 5445145e639c..db8bb6b3a2b0 100644 --- a/net/sunrpc/svcsock.c +++ b/net/sunrpc/svcsock.c @@ -1004,7 +1004,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp) if (!bc_xprt) return -EAGAIN; - spin_lock(&bc_xprt->recv_lock); + spin_lock(&bc_xprt->queue_lock); req = xprt_lookup_rqst(bc_xprt, xid); if (!req) goto unlock_notfound; @@ -1022,7 +1022,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp) memcpy(dst->iov_base, src->iov_base, src->iov_len); xprt_complete_rqst(req->rq_task, rqstp->rq_arg.len); rqstp->rq_arg.len = 0; - spin_unlock(&bc_xprt->recv_lock); + spin_unlock(&bc_xprt->queue_lock); return 0; unlock_notfound: printk(KERN_NOTICE @@ -1031,7 +1031,7 @@ unlock_notfound: __func__, ntohl(calldir), bc_xprt, ntohl(xid)); unlock_eagain: - spin_unlock(&bc_xprt->recv_lock); + spin_unlock(&bc_xprt->queue_lock); return -EAGAIN; } diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 3a3b3445a7c0..6e3d4b4ee79e 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -826,7 +826,7 @@ static void xprt_connect_status(struct rpc_task *task) * @xprt: transport on which the original request was transmitted * @xid: RPC XID of incoming reply * - * Caller holds xprt->recv_lock. + * Caller holds xprt->queue_lock. */ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) { @@ -892,7 +892,7 @@ static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) * xprt_update_rtt - Update RPC RTT statistics * @task: RPC request that recently completed * - * Caller holds xprt->recv_lock. + * Caller holds xprt->queue_lock. */ void xprt_update_rtt(struct rpc_task *task) { @@ -914,7 +914,7 @@ EXPORT_SYMBOL_GPL(xprt_update_rtt); * @task: RPC request that recently completed * @copied: actual number of bytes received from the transport * - * Caller holds xprt->recv_lock. + * Caller holds xprt->queue_lock. */ void xprt_complete_rqst(struct rpc_task *task, int copied) { @@ -1034,10 +1034,10 @@ void xprt_transmit(struct rpc_task *task) memcpy(&req->rq_private_buf, &req->rq_rcv_buf, sizeof(req->rq_private_buf)); /* Add request to the receive list */ - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); list_add_tail(&req->rq_list, &xprt->recv); set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); xprt_reset_majortimeo(req); /* Turn off autodisconnect */ del_singleshot_timer_sync(&xprt->timer); @@ -1076,7 +1076,7 @@ void xprt_transmit(struct rpc_task *task) * The spinlock ensures atomicity between the test of * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on(). */ - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) { rpc_sleep_on(&xprt->pending, task, xprt_timer); /* Wake up immediately if the connection was dropped */ @@ -1084,7 +1084,7 @@ void xprt_transmit(struct rpc_task *task) rpc_wake_up_queued_task_set_status(&xprt->pending, task, -ENOTCONN); } - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); } } @@ -1379,18 +1379,18 @@ void xprt_release(struct rpc_task *task) task->tk_ops->rpc_count_stats(task, task->tk_calldata); else if (task->tk_client) rpc_count_iostats(task, task->tk_client->cl_metrics); - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); if (!list_empty(&req->rq_list)) { list_del_init(&req->rq_list); if (xprt_is_pinned_rqst(req)) { set_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate); - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); xprt_wait_on_pinned_rqst(req); - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); clear_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate); } } - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); spin_lock_bh(&xprt->transport_lock); xprt->ops->release_xprt(xprt, task); if (xprt->ops->release_request) @@ -1420,7 +1420,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net) spin_lock_init(&xprt->transport_lock); spin_lock_init(&xprt->reserve_lock); - spin_lock_init(&xprt->recv_lock); + spin_lock_init(&xprt->queue_lock); INIT_LIST_HEAD(&xprt->free); INIT_LIST_HEAD(&xprt->recv); diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index c8ae983c6cc0..0020dc401215 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -1238,7 +1238,7 @@ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep) goto out_badheader; out: - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); cwnd = xprt->cwnd; xprt->cwnd = r_xprt->rx_buf.rb_credits << RPC_CWNDSHIFT; if (xprt->cwnd > cwnd) @@ -1246,7 +1246,7 @@ out: xprt_complete_rqst(rqst->rq_task, status); xprt_unpin_rqst(rqst); - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); return; /* If the incoming reply terminated a pending RPC, the next @@ -1345,7 +1345,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) /* Match incoming rpcrdma_rep to an rpcrdma_req to * get context for handling any incoming chunks. */ - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); rqst = xprt_lookup_rqst(xprt, rep->rr_xid); if (!rqst) goto out_norqst; @@ -1357,7 +1357,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) credits = buf->rb_max_requests; buf->rb_credits = credits; - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); req = rpcr_to_rdmar(rqst); req->rl_reply = rep; @@ -1378,7 +1378,7 @@ out_badversion: * is corrupt. */ out_norqst: - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); trace_xprtrdma_reply_rqst(rep); goto repost; diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c index a68180090554..09b12b7568fe 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c @@ -56,7 +56,7 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp, if (src->iov_len < 24) goto out_shortreply; - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); req = xprt_lookup_rqst(xprt, xid); if (!req) goto out_notfound; @@ -86,7 +86,7 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp, rcvbuf->len = 0; out_unlock: - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); out: return ret; diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 3fbccebd0b10..8d6404259ff9 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -966,12 +966,12 @@ static void xs_local_data_read_skb(struct rpc_xprt *xprt, return; /* Look up and lock the request corresponding to the given XID */ - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); rovr = xprt_lookup_rqst(xprt, *xp); if (!rovr) goto out_unlock; xprt_pin_rqst(rovr); - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); task = rovr->rq_task; copied = rovr->rq_private_buf.buflen; @@ -980,16 +980,16 @@ static void xs_local_data_read_skb(struct rpc_xprt *xprt, if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) { dprintk("RPC: sk_buff copy failed\n"); - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); goto out_unpin; } - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); xprt_complete_rqst(task, copied); out_unpin: xprt_unpin_rqst(rovr); out_unlock: - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); } static void xs_local_data_receive(struct sock_xprt *transport) @@ -1058,13 +1058,13 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt, return; /* Look up and lock the request corresponding to the given XID */ - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); rovr = xprt_lookup_rqst(xprt, *xp); if (!rovr) goto out_unlock; xprt_pin_rqst(rovr); xprt_update_rtt(rovr->rq_task); - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); task = rovr->rq_task; if ((copied = rovr->rq_private_buf.buflen) > repsize) @@ -1072,7 +1072,7 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt, /* Suck it into the iovec, verify checksum if not done by hw. */ if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) { - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); __UDPX_INC_STATS(sk, UDP_MIB_INERRORS); goto out_unpin; } @@ -1081,13 +1081,13 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt, spin_lock_bh(&xprt->transport_lock); xprt_adjust_cwnd(xprt, task, copied); spin_unlock_bh(&xprt->transport_lock); - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); xprt_complete_rqst(task, copied); __UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS); out_unpin: xprt_unpin_rqst(rovr); out_unlock: - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); } static void xs_udp_data_receive(struct sock_xprt *transport) @@ -1356,24 +1356,24 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, dprintk("RPC: read reply XID %08x\n", ntohl(transport->recv.xid)); /* Find and lock the request corresponding to this xid */ - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); req = xprt_lookup_rqst(xprt, transport->recv.xid); if (!req) { dprintk("RPC: XID %08x request not found!\n", ntohl(transport->recv.xid)); - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); return -1; } xprt_pin_rqst(req); - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); xs_tcp_read_common(xprt, desc, req); - spin_lock(&xprt->recv_lock); + spin_lock(&xprt->queue_lock); if (!(transport->recv.flags & TCP_RCV_COPY_DATA)) xprt_complete_rqst(req->rq_task, transport->recv.copied); xprt_unpin_rqst(req); - spin_unlock(&xprt->recv_lock); + spin_unlock(&xprt->queue_lock); return 0; } -- cgit v1.2.3 From edc81dcd5b7f699c4049042b35c904396642032e Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Wed, 22 Aug 2018 17:55:46 -0400 Subject: SUNRPC: Refactor xprt_transmit() to remove the reply queue code Separate out the action of adding a request to the reply queue so that the backchannel code can simply skip calling it altogether. Signed-off-by: Trond Myklebust --- net/sunrpc/backchannel_rqst.c | 1 - net/sunrpc/clnt.c | 5 ++ net/sunrpc/xprt.c | 127 +++++++++++++++++++++++++------------- net/sunrpc/xprtrdma/backchannel.c | 1 - 4 files changed, 88 insertions(+), 46 deletions(-) (limited to 'net') diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c index 3c15a99b9700..fa5ba6ed3197 100644 --- a/net/sunrpc/backchannel_rqst.c +++ b/net/sunrpc/backchannel_rqst.c @@ -91,7 +91,6 @@ struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt, gfp_t gfp_flags) return NULL; req->rq_xprt = xprt; - INIT_LIST_HEAD(&req->rq_list); INIT_LIST_HEAD(&req->rq_bc_list); /* Preallocate one XDR receive buffer */ diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index a858366cd15d..414966273a3f 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -1962,6 +1962,11 @@ call_transmit(struct rpc_task *task) return; } } + + /* Add task to reply queue before transmission to avoid races */ + if (rpc_reply_expected(task)) + xprt_request_enqueue_receive(task); + if (!xprt_prepare_transmit(task)) return; task->tk_action = call_transmit_status; diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 6e3d4b4ee79e..2ae0a4c47d59 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -888,6 +888,62 @@ static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req)); } +static bool +xprt_request_data_received(struct rpc_task *task) +{ + return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && + READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0; +} + +static bool +xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req) +{ + return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && + READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0; +} + +/** + * xprt_request_enqueue_receive - Add an request to the receive queue + * @task: RPC task + * + */ +void +xprt_request_enqueue_receive(struct rpc_task *task) +{ + struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; + + if (!xprt_request_need_enqueue_receive(task, req)) + return; + spin_lock(&xprt->queue_lock); + + /* Update the softirq receive buffer */ + memcpy(&req->rq_private_buf, &req->rq_rcv_buf, + sizeof(req->rq_private_buf)); + + /* Add request to the receive list */ + list_add_tail(&req->rq_list, &xprt->recv); + set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); + spin_unlock(&xprt->queue_lock); + + xprt_reset_majortimeo(req); + /* Turn off autodisconnect */ + del_singleshot_timer_sync(&xprt->timer); +} + +/** + * xprt_request_dequeue_receive_locked - Remove a request from the receive queue + * @task: RPC task + * + * Caller must hold xprt->queue_lock. + */ +static void +xprt_request_dequeue_receive_locked(struct rpc_task *task) +{ + if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) + list_del(&task->tk_rqstp->rq_list); +} + /** * xprt_update_rtt - Update RPC RTT statistics * @task: RPC request that recently completed @@ -927,24 +983,16 @@ void xprt_complete_rqst(struct rpc_task *task, int copied) xprt->stat.recvs++; - list_del_init(&req->rq_list); req->rq_private_buf.len = copied; /* Ensure all writes are done before we update */ /* req->rq_reply_bytes_recvd */ smp_wmb(); req->rq_reply_bytes_recvd = copied; - clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); + xprt_request_dequeue_receive_locked(task); rpc_wake_up_queued_task(&xprt->pending, task); } EXPORT_SYMBOL_GPL(xprt_complete_rqst); -static bool -xprt_request_data_received(struct rpc_task *task) -{ - return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && - task->tk_rqstp->rq_reply_bytes_recvd != 0; -} - static void xprt_timer(struct rpc_task *task) { struct rpc_rqst *req = task->tk_rqstp; @@ -1018,32 +1066,15 @@ void xprt_transmit(struct rpc_task *task) dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); - if (!req->rq_reply_bytes_recvd) { - + if (!req->rq_bytes_sent) { + if (xprt_request_data_received(task)) + return; /* Verify that our message lies in the RPCSEC_GSS window */ - if (!req->rq_bytes_sent && rpcauth_xmit_need_reencode(task)) { + if (rpcauth_xmit_need_reencode(task)) { task->tk_status = -EBADMSG; return; } - - if (list_empty(&req->rq_list) && rpc_reply_expected(task)) { - /* - * Add to the list only if we're expecting a reply - */ - /* Update the softirq receive buffer */ - memcpy(&req->rq_private_buf, &req->rq_rcv_buf, - sizeof(req->rq_private_buf)); - /* Add request to the receive list */ - spin_lock(&xprt->queue_lock); - list_add_tail(&req->rq_list, &xprt->recv); - set_