diff options
Diffstat (limited to 'drivers/block/drbd/drbd_receiver.c')
-rw-r--r-- | drivers/block/drbd/drbd_receiver.c | 203 |
1 files changed, 115 insertions, 88 deletions
diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c index eed4ae9107b4..ea54341df3bf 100644 --- a/drivers/block/drbd/drbd_receiver.c +++ b/drivers/block/drbd/drbd_receiver.c @@ -215,7 +215,7 @@ static void reclaim_finished_net_peer_reqs(struct drbd_device *device, } } -static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device) +static void drbd_reclaim_net_peer_reqs(struct drbd_device *device) { LIST_HEAD(reclaimed); struct drbd_peer_request *peer_req, *t; @@ -223,11 +223,30 @@ static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device) spin_lock_irq(&device->resource->req_lock); reclaim_finished_net_peer_reqs(device, &reclaimed); spin_unlock_irq(&device->resource->req_lock); - list_for_each_entry_safe(peer_req, t, &reclaimed, w.list) drbd_free_net_peer_req(device, peer_req); } +static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection) +{ + struct drbd_peer_device *peer_device; + int vnr; + + rcu_read_lock(); + idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { + struct drbd_device *device = peer_device->device; + if (!atomic_read(&device->pp_in_use_by_net)) + continue; + + kref_get(&device->kref); + rcu_read_unlock(); + drbd_reclaim_net_peer_reqs(device); + kref_put(&device->kref, drbd_destroy_device); + rcu_read_lock(); + } + rcu_read_unlock(); +} + /** * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled) * @device: DRBD device. @@ -265,10 +284,15 @@ struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int if (atomic_read(&device->pp_in_use) < mxb) page = __drbd_alloc_pages(device, number); + /* Try to keep the fast path fast, but occasionally we need + * to reclaim the pages we lended to the network stack. */ + if (page && atomic_read(&device->pp_in_use_by_net) > 512) + drbd_reclaim_net_peer_reqs(device); + while (page == NULL) { prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE); - drbd_kick_lo_and_reclaim_net(device); + drbd_reclaim_net_peer_reqs(device); if (atomic_read(&device->pp_in_use) < mxb) { page = __drbd_alloc_pages(device, number); @@ -1100,6 +1124,11 @@ randomize: } drbd_thread_start(&connection->ack_receiver); + connection->ack_sender = create_singlethread_workqueue("drbd_ack_sender"); + if (!connection->ack_sender) { + drbd_err(connection, "Failed to create workqueue ack_sender\n"); + return 0; + } mutex_lock(&connection->resource->conf_update); /* The discard_my_data flag is a single-shot modifier to the next @@ -1746,7 +1775,7 @@ static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_req } /* - * e_end_resync_block() is called in asender context via + * e_end_resync_block() is called in ack_sender context via * drbd_finish_peer_reqs(). */ static int e_end_resync_block(struct drbd_work *w, int unused) @@ -1920,7 +1949,7 @@ static void restart_conflicting_writes(struct drbd_device *device, } /* - * e_end_block() is called in asender context via drbd_finish_peer_reqs(). + * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs(). */ static int e_end_block(struct drbd_work *w, int cancel) { @@ -2211,7 +2240,7 @@ static int handle_write_conflicts(struct drbd_device *device, peer_req->w.cb = superseded ? e_send_superseded : e_send_retry_write; list_add_tail(&peer_req->w.list, &device->done_ee); - wake_asender(connection); + queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work); err = -ENOENT; goto out; @@ -4050,7 +4079,7 @@ static int receive_state(struct drbd_connection *connection, struct packet_info os = ns = drbd_read_state(device); spin_unlock_irq(&device->resource->req_lock); - /* If some other part of the code (asender thread, timeout) + /* If some other part of the code (ack_receiver thread, timeout) * already decided to close the connection again, * we must not "re-establish" it here. */ if (os.conn <= C_TEAR_DOWN) @@ -4655,8 +4684,12 @@ static void conn_disconnect(struct drbd_connection *connection) */ conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); - /* asender does not clean up anything. it must not interfere, either */ + /* ack_receiver does not clean up anything. it must not interfere, either */ drbd_thread_stop(&connection->ack_receiver); + if (connection->ack_sender) { + destroy_workqueue(connection->ack_sender); + connection->ack_sender = NULL; + } drbd_free_sock(connection); rcu_read_lock(); @@ -5425,49 +5458,39 @@ static int got_skip(struct drbd_connection *connection, struct packet_info *pi) return 0; } -static int connection_finish_peer_reqs(struct drbd_connection *connection) +struct meta_sock_cmd { + size_t pkt_size; + int (*fn)(struct drbd_connection *connection, struct packet_info *); +}; + +static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout) { - struct drbd_peer_device *peer_device; - int vnr, not_empty = 0; + long t; + struct net_conf *nc; - do { - clear_bit(SIGNAL_ASENDER, &connection->flags); - flush_signals(current); + rcu_read_lock(); + nc = rcu_dereference(connection->net_conf); + t = ping_timeout ? nc->ping_timeo : nc->ping_int; + rcu_read_unlock(); - rcu_read_lock(); - idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { - struct drbd_device *device = peer_device->device; - kref_get(&device->kref); - rcu_read_unlock(); - if (drbd_finish_peer_reqs(device)) { - kref_put(&device->kref, drbd_destroy_device); - return 1; - } - kref_put(&device->kref, drbd_destroy_device); - rcu_read_lock(); - } - set_bit(SIGNAL_ASENDER, &connection->flags); + t *= HZ; + if (ping_timeout) + t /= 10; - spin_lock_irq(&connection->resource->req_lock); - idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { - struct drbd_device *device = peer_device->device; - not_empty = !list_empty(&device->done_ee); - if (not_empty) - break; - } - spin_unlock_irq(&connection->resource->req_lock); - rcu_read_unlock(); - } while (not_empty); + connection->meta.socket->sk->sk_rcvtimeo = t; +} - return 0; +static void set_ping_timeout(struct drbd_connection *connection) +{ + set_rcvtimeo(connection, 1); } -struct asender_cmd { - size_t pkt_size; - int (*fn)(struct drbd_connection *connection, struct packet_info *); -}; +static void set_idle_timeout(struct drbd_connection *connection) +{ + set_rcvtimeo(connection, 0); +} -static struct asender_cmd asender_tbl[] = { +static struct meta_sock_cmd ack_receiver_tbl[] = { [P_PING] = { 0, got_Ping }, [P_PING_ACK] = { 0, got_PingAck }, [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, @@ -5490,61 +5513,37 @@ static struct asender_cmd asender_tbl[] = { int drbd_ack_receiver(struct drbd_thread *thi) { struct drbd_connection *connection = thi->connection; - struct asender_cmd *cmd = NULL; + struct meta_sock_cmd *cmd = NULL; struct packet_info pi; + unsigned long pre_recv_jif; int rv; void *buf = connection->meta.rbuf; int received = 0; unsigned int header_size = drbd_header_size(connection); int expect = header_size; bool ping_timeout_active = false; - struct net_conf *nc; - int ping_timeo, tcp_cork, ping_int; struct sched_param param = { .sched_priority = 2 }; rv = sched_setscheduler(current, SCHED_RR, ¶m); if (rv < 0) - drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv); + drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv); while (get_t_state(thi) == RUNNING) { drbd_thread_current_set_cpu(thi); - rcu_read_lock(); - nc = rcu_dereference(connection->net_conf); - ping_timeo = nc->ping_timeo; - tcp_cork = nc->tcp_cork; - ping_int = nc->ping_int; - rcu_read_unlock(); + conn_reclaim_net_peer_reqs(connection); if (test_and_clear_bit(SEND_PING, &connection->flags)) { if (drbd_send_ping(connection)) { drbd_err(connection, "drbd_send_ping has failed\n"); goto reconnect; } - connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10; + set_ping_timeout(connection); ping_timeout_active = true; } - /* TODO: conditionally cork; it may hurt latency if we cork without - much to send */ - if (tcp_cork) - drbd_tcp_cork(connection->meta.socket); - if (connection_finish_peer_reqs(connection)) { - drbd_err(connection, "connection_finish_peer_reqs() failed\n"); - goto reconnect; - } - /* but unconditionally uncork unless disabled */ - if (tcp_cork) - drbd_tcp_uncork(connection->meta.socket); - - /* short circuit, recv_msg would return EINTR anyways. */ - if (signal_pending(current)) - continue; - + pre_recv_jif = jiffies; rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0); - clear_bit(SIGNAL_ASENDER, &connection->flags); - - flush_signals(current); /* Note: * -EINTR (on meta) we got a signal @@ -5556,7 +5555,6 @@ int drbd_ack_receiver(struct drbd_thread *thi) * rv < expected: "woken" by signal during receive * rv == 0 : "connection shut down by peer" */ -received_more: if (likely(rv > 0)) { received += rv; buf += rv; @@ -5578,8 +5576,7 @@ received_more: } else if (rv == -EAGAIN) { /* If the data socket received something meanwhile, * that is good enough: peer is still alive. */ - if (time_after(connection->last_received, - jiffies - connection->meta.socket->sk->sk_rcvtimeo)) + if (time_after(connection->last_received, pre_recv_jif)) continue; if (ping_timeout_active) { drbd_err(connection, "PingAck did not arrive in time.\n"); @@ -5588,6 +5585,10 @@ received_more: set_bit(SEND_PING, &connection->flags); continue; } else if (rv == -EINTR) { + /* maybe drbd_thread_stop(): the while condition will notice. + * maybe woken for send_ping: we'll send a ping above, + * and change the rcvtimeo */ + flush_signals(current); continue; } else { drbd_err(connection, "sock_recvmsg returned %d\n", rv); @@ -5597,8 +5598,8 @@ received_more: if (received == expect && cmd == NULL) { if (decode_header(connection, connection->meta.rbuf, &pi)) goto reconnect; - cmd = &asender_tbl[pi.cmd]; - if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) { + cmd = &ack_receiver_tbl[pi.cmd]; + if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) { drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n", cmdname(pi.cmd), pi.cmd); goto disconnect; @@ -5621,9 +5622,8 @@ received_more: connection->last_received = jiffies; - if (cmd == &asender_tbl[P_PING_ACK]) { - /* restore idle timeout */ - connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ; + if (cmd == &ack_receiver_tbl[P_PING_ACK]) { + set_idle_timeout(connection); ping_timeout_active = false; } @@ -5632,11 +5632,6 @@ received_more: expect = header_size; cmd = NULL; } - if (test_bit(SEND_PING, &connection->flags)) - continue; - rv = drbd_recv_short(connection->meta.socket, buf, expect-received, MSG_DONTWAIT); - if (rv > 0) - goto received_more; } if (0) { @@ -5648,9 +5643,41 @@ reconnect: disconnect: conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); } - clear_bit(SIGNAL_ASENDER, &connection->flags); - drbd_info(connection, "asender terminated\n"); + drbd_info(connection, "ack_receiver terminated\n"); return 0; } + +void drbd_send_acks_wf(struct work_struct *ws) +{ + struct drbd_peer_device *peer_device = + container_of(ws, struct drbd_peer_device, send_acks_work); + struct drbd_connection *connection = peer_device->connection; + struct drbd_device *device = peer_device->device; + struct net_conf *nc; + int tcp_cork, err; + + rcu_read_lock(); + nc = rcu_dereference(connection->net_conf); + tcp_cork = nc->tcp_cork; + rcu_read_unlock(); + + if (tcp_cork) + drbd_tcp_cork(connection->meta.socket); + + err = drbd_finish_peer_reqs(device); + kref_put(&device->kref, drbd_destroy_device); + /* get is in drbd_endio_write_sec_final(). That is necessary to keep the + struct work_struct send_acks_work alive, which is in the peer_device object */ + + if (err) { + conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); + return; + } + + if (tcp_cork) + drbd_tcp_uncork(connection->meta.socket); + + return; +} |