summaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2012-07-31 14:35:28 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2012-07-31 14:35:28 -0700
commitcc8362b1f6d724e46f515121d442779924b19fec (patch)
tree86fb5c3767e538ec9ded57dd7b3ce5d69dcde691 /net
parent2e3ee613480563a6d5c01b57d342e65cc58c06df (diff)
parent1fe5e9932156f6122c3b1ff6ba7541c27c86718c (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph changes from Sage Weil: "Lots of stuff this time around: - lots of cleanup and refactoring in the libceph messenger code, and many hard to hit races and bugs closed as a result. - lots of cleanup and refactoring in the rbd code from Alex Elder, mostly in preparation for the layering functionality that will be coming in 3.7. - some misc rbd cleanups from Josh Durgin that are finally going upstream - support for CRUSH tunables (used by newer clusters to improve the data placement) - some cleanup in our use of d_parent that Al brought up a while back - a random collection of fixes across the tree There is another patch coming that fixes up our ->atomic_open() behavior, but I'm going to hammer on it a bit more before sending it." Fix up conflicts due to commits that were already committed earlier in drivers/block/rbd.c, net/ceph/{messenger.c, osd_client.c} * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (132 commits) rbd: create rbd_refresh_helper() rbd: return obj version in __rbd_refresh_header() rbd: fixes in rbd_header_from_disk() rbd: always pass ops array to rbd_req_sync_op() rbd: pass null version pointer in add_snap() rbd: make rbd_create_rw_ops() return a pointer rbd: have __rbd_add_snap_dev() return a pointer libceph: recheck con state after allocating incoming message libceph: change ceph_con_in_msg_alloc convention to be less weird libceph: avoid dropping con mutex before fault libceph: verify state after retaking con lock after dispatch libceph: revoke mon_client messages on session restart libceph: fix handling of immediate socket connect failure ceph: update MAINTAINERS file libceph: be less chatty about stray replies libceph: clear all flags on con_close libceph: clean up con flags libceph: replace connection state bits with states libceph: drop unnecessary CLOSED check in socket state change callback libceph: close socket directly from ceph_con_close() ...
Diffstat (limited to 'net')
-rw-r--r--net/ceph/ceph_common.c25
-rw-r--r--net/ceph/crush/mapper.c13
-rw-r--r--net/ceph/messenger.c925
-rw-r--r--net/ceph/mon_client.c76
-rw-r--r--net/ceph/msgpool.c7
-rw-r--r--net/ceph/osd_client.c77
-rw-r--r--net/ceph/osdmap.c59
7 files changed, 737 insertions, 445 deletions
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index ba4323bce0e9..69e38db28e5f 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -17,6 +17,7 @@
#include <linux/string.h>
+#include <linux/ceph/ceph_features.h>
#include <linux/ceph/libceph.h>
#include <linux/ceph/debugfs.h>
#include <linux/ceph/decode.h>
@@ -460,27 +461,23 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
client->auth_err = 0;
client->extra_mon_dispatch = NULL;
- client->supported_features = CEPH_FEATURE_SUPPORTED_DEFAULT |
+ client->supported_features = CEPH_FEATURES_SUPPORTED_DEFAULT |
supported_features;
- client->required_features = CEPH_FEATURE_REQUIRED_DEFAULT |
+ client->required_features = CEPH_FEATURES_REQUIRED_DEFAULT |
required_features;
/* msgr */
if (ceph_test_opt(client, MYIP))
myaddr = &client->options->my_addr;
- client->msgr = ceph_messenger_create(myaddr,
- client->supported_features,
- client->required_features);
- if (IS_ERR(client->msgr)) {
- err = PTR_ERR(client->msgr);
- goto fail;
- }
- client->msgr->nocrc = ceph_test_opt(client, NOCRC);
+ ceph_messenger_init(&client->msgr, myaddr,
+ client->supported_features,
+ client->required_features,
+ ceph_test_opt(client, NOCRC));
/* subsystems */
err = ceph_monc_init(&client->monc, client);
if (err < 0)
- goto fail_msgr;
+ goto fail;
err = ceph_osdc_init(&client->osdc, client);
if (err < 0)
goto fail_monc;
@@ -489,8 +486,6 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
fail_monc:
ceph_monc_stop(&client->monc);
-fail_msgr:
- ceph_messenger_destroy(client->msgr);
fail:
kfree(client);
return ERR_PTR(err);
@@ -501,6 +496,8 @@ void ceph_destroy_client(struct ceph_client *client)
{
dout("destroy_client %p\n", client);
+ atomic_set(&client->msgr.stopping, 1);
+
/* unmount */
ceph_osdc_stop(&client->osdc);
@@ -508,8 +505,6 @@ void ceph_destroy_client(struct ceph_client *client)
ceph_debugfs_client_cleanup(client);
- ceph_messenger_destroy(client->msgr);
-
ceph_destroy_options(client->options);
kfree(client);
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index d7edc24333b8..35fce755ce10 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -306,7 +306,6 @@ static int crush_choose(const struct crush_map *map,
int item = 0;
int itemtype;
int collide, reject;
- const unsigned int orig_tries = 5; /* attempts before we fall back to search */
dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "",
bucket->id, x, outpos, numrep);
@@ -351,8 +350,9 @@ static int crush_choose(const struct crush_map *map,
reject = 1;
goto reject;
}
- if (flocal >= (in->size>>1) &&
- flocal > orig_tries)
+ if (map->choose_local_fallback_tries > 0 &&
+ flocal >= (in->size>>1) &&
+ flocal > map->choose_local_fallback_tries)
item = bucket_perm_choose(in, x, r);
else
item = crush_bucket_choose(in, x, r);
@@ -422,13 +422,14 @@ reject:
ftotal++;
flocal++;
- if (collide && flocal < 3)
+ if (collide && flocal <= map->choose_local_tries)
/* retry locally a few times */
retry_bucket = 1;
- else if (flocal <= in->size + orig_tries)
+ else if (map->choose_local_fallback_tries > 0 &&
+ flocal <= in->size + map->choose_local_fallback_tries)
/* exhaustive bucket search */
retry_bucket = 1;
- else if (ftotal < 20)
+ else if (ftotal <= map->choose_total_tries)
/* then retry descent */
retry_descent = 1;
else
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 10255e81be79..b9796750034a 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -29,6 +29,74 @@
* the sender.
*/
+/*
+ * We track the state of the socket on a given connection using
+ * values defined below. The transition to a new socket state is
+ * handled by a function which verifies we aren't coming from an
+ * unexpected state.
+ *
+ * --------
+ * | NEW* | transient initial state
+ * --------
+ * | con_sock_state_init()
+ * v
+ * ----------
+ * | CLOSED | initialized, but no socket (and no
+ * ---------- TCP connection)
+ * ^ \
+ * | \ con_sock_state_connecting()
+ * | ----------------------
+ * | \
+ * + con_sock_state_closed() \
+ * |+--------------------------- \
+ * | \ \ \
+ * | ----------- \ \
+ * | | CLOSING | socket event; \ \
+ * | ----------- await close \ \
+ * | ^ \ |
+ * | | \ |
+ * | + con_sock_state_closing() \ |
+ * | / \ | |
+ * | / --------------- | |
+ * | / \ v v
+ * | / --------------
+ * | / -----------------| CONNECTING | socket created, TCP
+ * | | / -------------- connect initiated
+ * | | | con_sock_state_connected()
+ * | | v
+ * -------------
+ * | CONNECTED | TCP connection established
+ * -------------
+ *
+ * State values for ceph_connection->sock_state; NEW is assumed to be 0.
+ */
+
+#define CON_SOCK_STATE_NEW 0 /* -> CLOSED */
+#define CON_SOCK_STATE_CLOSED 1 /* -> CONNECTING */
+#define CON_SOCK_STATE_CONNECTING 2 /* -> CONNECTED or -> CLOSING */
+#define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */
+#define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */
+
+/*
+ * connection states
+ */
+#define CON_STATE_CLOSED 1 /* -> PREOPEN */
+#define CON_STATE_PREOPEN 2 /* -> CONNECTING, CLOSED */
+#define CON_STATE_CONNECTING 3 /* -> NEGOTIATING, CLOSED */
+#define CON_STATE_NEGOTIATING 4 /* -> OPEN, CLOSED */
+#define CON_STATE_OPEN 5 /* -> STANDBY, CLOSED */
+#define CON_STATE_STANDBY 6 /* -> PREOPEN, CLOSED */
+
+/*
+ * ceph_connection flag bits
+ */
+#define CON_FLAG_LOSSYTX 0 /* we can close channel or drop
+ * messages on errors */
+#define CON_FLAG_KEEPALIVE_PENDING 1 /* we need to send a keepalive */
+#define CON_FLAG_WRITE_PENDING 2 /* we have data ready to send */
+#define CON_FLAG_SOCK_CLOSED 3 /* socket state changed to closed */
+#define CON_FLAG_BACKOFF 4 /* need to retry queuing delayed work */
+
/* static tag bytes (protocol control messages) */
static char tag_msg = CEPH_MSGR_TAG_MSG;
static char tag_ack = CEPH_MSGR_TAG_ACK;
@@ -147,72 +215,130 @@ void ceph_msgr_flush(void)
}
EXPORT_SYMBOL(ceph_msgr_flush);
+/* Connection socket state transition functions */
+
+static void con_sock_state_init(struct ceph_connection *con)
+{
+ int old_state;
+
+ old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
+ if (WARN_ON(old_state != CON_SOCK_STATE_NEW))
+ printk("%s: unexpected old state %d\n", __func__, old_state);
+ dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
+ CON_SOCK_STATE_CLOSED);
+}
+
+static void con_sock_state_connecting(struct ceph_connection *con)
+{
+ int old_state;
+
+ old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING);
+ if (WARN_ON(old_state != CON_SOCK_STATE_CLOSED))
+ printk("%s: unexpected old state %d\n", __func__, old_state);
+ dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
+ CON_SOCK_STATE_CONNECTING);
+}
+
+static void con_sock_state_connected(struct ceph_connection *con)
+{
+ int old_state;
+
+ old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED);
+ if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING))
+ printk("%s: unexpected old state %d\n", __func__, old_state);
+ dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
+ CON_SOCK_STATE_CONNECTED);
+}
+
+static void con_sock_state_closing(struct ceph_connection *con)
+{
+ int old_state;
+
+ old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING);
+ if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING &&
+ old_state != CON_SOCK_STATE_CONNECTED &&
+ old_state != CON_SOCK_STATE_CLOSING))
+ printk("%s: unexpected old state %d\n", __func__, old_state);
+ dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
+ CON_SOCK_STATE_CLOSING);
+}
+
+static void con_sock_state_closed(struct ceph_connection *con)
+{
+ int old_state;
+
+ old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
+ if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED &&
+ old_state != CON_SOCK_STATE_CLOSING &&
+ old_state != CON_SOCK_STATE_CONNECTING &&
+ old_state != CON_SOCK_STATE_CLOSED))
+ printk("%s: unexpected old state %d\n", __func__, old_state);
+ dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
+ CON_SOCK_STATE_CLOSED);
+}
/*
* socket callback functions
*/
/* data available on socket, or listen socket received a connect */
-static void ceph_data_ready(struct sock *sk, int count_unused)
+static void ceph_sock_data_ready(struct sock *sk, int count_unused)
{
struct ceph_connection *con = sk->sk_user_data;
+ if (atomic_read(&con->msgr->stopping)) {
+ return;
+ }
if (sk->sk_state != TCP_CLOSE_WAIT) {
- dout("ceph_data_ready on %p state = %lu, queueing work\n",
+ dout("%s on %p state = %lu, queueing work\n", __func__,
con, con->state);
queue_con(con);
}
}
/* socket has buffer space for writing */
-static void ceph_write_space(struct sock *sk)
+static void ceph_sock_write_space(struct sock *sk)
{
struct ceph_connection *con = sk->sk_user_data;
/* only queue to workqueue if there is data we want to write,
* and there is sufficient space in the socket buffer to accept
- * more data. clear SOCK_NOSPACE so that ceph_write_space()
+ * more data. clear SOCK_NOSPACE so that ceph_sock_write_space()
* doesn't get called again until try_write() fills the socket
* buffer. See net/ipv4/tcp_input.c:tcp_check_space()
* and net/core/stream.c:sk_stream_write_space().
*/
- if (test_bit(WRITE_PENDING, &con->state)) {
+ if (test_bit(CON_FLAG_WRITE_PENDING, &con->flags)) {
if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
- dout("ceph_write_space %p queueing write work\n", con);
+ dout("%s %p queueing write work\n", __func__, con);
clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
queue_con(con);
}
} else {
- dout("ceph_write_space %p nothing to write\n", con);
+ dout("%s %p nothing to write\n", __func__, con);
}
}
/* socket's state has changed */
-static void ceph_state_change(struct sock *sk)
+static void ceph_sock_state_change(struct sock *sk)
{
struct ceph_connection *con = sk->sk_user_data;
- dout("ceph_state_change %p state = %lu sk_state = %u\n",
+ dout("%s %p state = %lu sk_state = %u\n", __func__,
con, con->state, sk->sk_state);
- if (test_bit(CLOSED, &con->state))
- return;
-
switch (sk->sk_state) {
case TCP_CLOSE:
- dout("ceph_state_change TCP_CLOSE\n");
+ dout("%s TCP_CLOSE\n", __func__);
case TCP_CLOSE_WAIT:
- dout("ceph_state_change TCP_CLOSE_WAIT\n");
- if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) {
- if (test_bit(CONNECTING, &con->state))
- con->error_msg = "connection failed";
- else
- con->error_msg = "socket closed";
- queue_con(con);
- }
+ dout("%s TCP_CLOSE_WAIT\n", __func__);
+ con_sock_state_closing(con);
+ set_bit(CON_FLAG_SOCK_CLOSED, &con->flags);
+ queue_con(con);
break;
case TCP_ESTABLISHED:
- dout("ceph_state_change TCP_ESTABLISHED\n");
+ dout("%s TCP_ESTABLISHED\n", __func__);
+ con_sock_state_connected(con);
queue_con(con);
break;
default: /* Everything else is uninteresting */
@@ -228,9 +354,9 @@ static void set_sock_callbacks(struct socket *sock,
{
struct sock *sk = sock->sk;
sk->sk_user_data = con;
- sk->sk_data_ready = ceph_data_ready;
- sk->sk_write_space = ceph_write_space;
- sk->sk_state_change = ceph_state_change;
+ sk->sk_data_ready = ceph_sock_data_ready;
+ sk->sk_write_space = ceph_sock_write_space;
+ sk->sk_state_change = ceph_sock_state_change;
}
@@ -262,6 +388,7 @@ static int ceph_tcp_connect(struct ceph_connection *con)
dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr));
+ con_sock_state_connecting(con);
ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr),
O_NONBLOCK);
if (ret == -EINPROGRESS) {
@@ -277,7 +404,6 @@ static int ceph_tcp_connect(struct ceph_connection *con)
return ret;
}
con->sock = sock;
-
return 0;
}
@@ -333,16 +459,24 @@ static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
*/
static int con_close_socket(struct ceph_connection *con)
{
- int rc;
+ int rc = 0;
dout("con_close_socket on %p sock %p\n", con, con->sock);
- if (!con->sock)
- return 0;
- set_bit(SOCK_CLOSED, &con->state);
- rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
- sock_release(con->sock);
- con->sock = NULL;
- clear_bit(SOCK_CLOSED, &con->state);
+ if (con->sock) {
+ rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
+ sock_release(con->sock);
+ con->sock = NULL;
+ }
+
+ /*
+ * Forcibly clear the SOCK_CLOSED flag. It gets set
+ * independent of the connection mutex, and we could have
+ * received a socket close event before we had the chance to
+ * shut the socket down.
+ */
+ clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags);
+
+ con_sock_state_closed(con);
return rc;
}
@@ -353,6 +487,10 @@ static int con_close_socket(struct ceph_connection *con)
static void ceph_msg_remove(struct ceph_msg *msg)
{
list_del_init(&msg->list_head);
+ BUG_ON(msg->con == NULL);
+ msg->con->ops->put(msg->con);
+ msg->con = NULL;
+
ceph_msg_put(msg);
}
static void ceph_msg_remove_list(struct list_head *head)
@@ -372,8 +510,11 @@ static void reset_connection(struct ceph_connection *con)
ceph_msg_remove_list(&con->out_sent);
if (con->in_msg) {
+ BUG_ON(con->in_msg->con != con);
+ con->in_msg->con = NULL;
ceph_msg_put(con->in_msg);
con->in_msg = NULL;
+ con->ops->put(con);
}
con->connect_seq = 0;
@@ -391,32 +532,44 @@ static void reset_connection(struct ceph_connection *con)
*/
void ceph_con_close(struct ceph_connection *con)
{
+ mutex_lock(&con->mutex);
dout("con_close %p peer %s\n", con,
ceph_pr_addr(&con->peer_addr.in_addr));
- set_bit(CLOSED, &con->state); /* in case there's queued work */
- clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */
- clear_bit(LOSSYTX, &con->state); /* so we retry next connect */
- clear_bit(KEEPALIVE_PENDING, &con->state);
- clear_bit(WRITE_PENDING, &con->state);
- mutex_lock(&con->mutex);
+ con->state = CON_STATE_CLOSED;
+
+ clear_bit(CON_FLAG_LOSSYTX, &con->flags); /* so we retry next connect */
+ clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags);
+ clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
+ clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags);
+ clear_bit(CON_FLAG_BACKOFF, &con->flags);
+
reset_connection(con);
con->peer_global_seq = 0;
cancel_delayed_work(&con->work);
+ con_close_socket(con);
mutex_unlock(&con->mutex);
- queue_con(con);
}
EXPORT_SYMBOL(ceph_con_close);
/*
* Reopen a closed connection, with a new peer address.
*/
-void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr)
+void ceph_con_open(struct ceph_connection *con,
+ __u8 entity_type, __u64 entity_num,
+ struct ceph_entity_addr *addr)
{
+ mutex_lock(&con->mutex);
dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr));
- set_bit(OPENING, &con->state);
- clear_bit(CLOSED, &con->state);
+
+ BUG_ON(con->state != CON_STATE_CLOSED);
+ con->state = CON_STATE_PREOPEN;
+
+ con->peer_name.type = (__u8) entity_type;
+ con->peer_name.num = cpu_to_le64(entity_num);
+
memcpy(&con->peer_addr, addr, sizeof(*addr));
con->delay = 0; /* reset backoff memory */
+ mutex_unlock(&con->mutex);
queue_con(con);
}
EXPORT_SYMBOL(ceph_con_open);
@@ -430,42 +583,26 @@ bool ceph_con_opened(struct ceph_connection *con)
}
/*
- * generic get/put
- */
-struct ceph_connection *ceph_con_get(struct ceph_connection *con)
-{
- int nref = __atomic_add_unless(&con->nref, 1, 0);
-
- dout("con_get %p nref = %d -> %d\n", con, nref, nref + 1);
-
- return nref ? con : NULL;
-}
-
-void ceph_con_put(struct ceph_connection *con)
-{
- int nref = atomic_dec_return(&con->nref);
-
- BUG_ON(nref < 0);
- if (nref == 0) {
- BUG_ON(con->sock);
- kfree(con);
- }
- dout("con_put %p nref = %d -> %d\n", con, nref + 1, nref);
-}
-
-/*
* initialize a new connection.
*/
-void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con)
+void ceph_con_init(struct ceph_connection *con, void *private,
+ const struct ceph_connection_operations *ops,
+ struct ceph_messenger *msgr)
{
dout("con_init %p\n", con);
memset(con, 0, sizeof(*con));
- atomic_set(&con->nref, 1);
+ con->private = private;
+ con->ops = ops;
con->msgr = msgr;
+
+ con_sock_state_init(con);
+
mutex_init(&con->mutex);
INIT_LIST_HEAD(&con->out_queue);
INIT_LIST_HEAD(&con->out_sent);
INIT_DELAYED_WORK(&con->work, con_work);
+
+ con->state = CON_STATE_CLOSED;
}
EXPORT_SYMBOL(ceph_con_init);
@@ -486,14 +623,14 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
return ret;
}
-static void ceph_con_out_kvec_reset(struct ceph_connection *con)
+static void con_out_kvec_reset(struct ceph_connection *con)
{
con->out_kvec_left = 0;
con->out_kvec_bytes = 0;
con->out_kvec_cur = &con->out_kvec[0];
}
-static void ceph_con_out_kvec_add(struct ceph_connection *con,
+static void con_out_kvec_add(struct ceph_connection *con,
size_t size, void *data)
{
int index;
@@ -507,6 +644,53 @@ static void ceph_con_out_kvec_add(struct ceph_connection *con,
con->out_kvec_bytes += size;
}
+#ifdef CONFIG_BLOCK
+static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
+{
+ if (!bio) {
+ *iter = NULL;
+ *seg = 0;
+ return;
+ }
+ *iter = bio;
+ *seg = bio->bi_idx;
+}
+
+static void iter_bio_next(struct bio **bio_iter, int *seg)
+{
+ if (*bio_iter == NULL)
+ return;
+
+ BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
+
+ (*seg)++;
+ if (*seg == (*bio_iter)->bi_vcnt)
+ init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
+}
+#endif
+
+static void prepare_write_message_data(struct ceph_connection *con)
+{
+ struct ceph_msg *msg = con->out_msg;
+
+ BUG_ON(!msg);
+ BUG_ON(!msg->hdr.data_len);
+
+ /* initialize page iterator */
+ con->out_msg_pos.page = 0;
+ if (msg->pages)
+ con->out_msg_pos.page_pos = msg->page_alignment;
+ else
+ con->out_msg_pos.page_pos = 0;
+#ifdef CONFIG_BLOCK
+ if (msg->bio)
+ init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
+#endif
+ con->out_msg_pos.data_pos = 0;
+ con->out_msg_pos.did_page_crc = false;
+ con->out_more = 1; /* data + footer will follow */
+}
+
/*
* Prepare footer for currently outgoing message, and finish things
* off. Assumes out_kvec* are already valid.. we just add on to the end.
@@ -516,6 +700,8 @@ static void prepare_write_message_footer(struct ceph_connection *con)
struct ceph_msg *m = con->out_msg;
int v = con->out_kvec_left;
+ m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
+
dout("prepare_write_message_footer %p\n", con);
con->out_kvec_is_msg = true;
con->out_kvec[v].iov_base = &m->footer;
@@ -534,7 +720,7 @@ static void prepare_write_message(struct ceph_connection *con)
struct ceph_msg *m;
u32 crc;
- ceph_con_out_kvec_reset(con);
+ con_out_kvec_reset(con);
con->out_kvec_is_msg = true;
con->out_msg_done = false;
@@ -542,14 +728,16 @@ static void prepare_write_message(struct ceph_connection *con)
* TCP packet that's a good thing. */
if (con->in_seq > con->in_seq_acked) {
con->in_seq_acked = con->in_seq;
- ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
+ con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
- ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack),
+ con_out_kvec_add(con, sizeof (con->out_temp_ack),
&con->out_temp_ack);
}
+ BUG_ON(list_empty(&con->out_queue));
m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
con->out_msg = m;
+ BUG_ON(m->con != con);
/* put message on sent list */
ceph_msg_get(m);
@@ -576,18 +764,18 @@ static void prepare_write_message(struct ceph_connection *con)
BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
/* tag + hdr + front + middle */
- ceph_con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
- ceph_con_out_kvec_add(con, sizeof (m->hdr), &m->hdr);
- ceph_con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
+ con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
+ con_out_kvec_add(con, sizeof (m->hdr), &m->hdr);
+ con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
if (m->middle)
- ceph_con_out_kvec_add(con, m->middle->vec.iov_len,
+ con_out_kvec_add(con, m->middle->vec.iov_len,
m->middle->vec.iov_base);
/* fill in crc (except data pages), footer */
crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
con->out_msg->hdr.crc = cpu_to_le32(crc);
- con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE;
+ con->out_msg->footer.flags = 0;
crc = crc32c(0, m->front.iov_base, m->front.iov_len);
con->out_msg->footer.front_crc = cpu_to_le32(crc);
@@ -597,28 +785,19 @@ static void prepare_write_message(struct ceph_connection *con)
con->out_msg->footer.middle_crc = cpu_to_le32(crc);
} else
con->out_msg->footer.middle_crc = 0;
- con->out_msg->footer.data_crc = 0;
- dout("prepare_write_message front_crc %u data_crc %u\n",
+ dout("%s front_crc %u middle_crc %u\n", __func__,
le32_to_cpu(con->out_msg->footer.front_crc),
le32_to_cpu(con->out_msg->footer.middle_crc));
/* is there a data payload? */
- if (le32_to_cpu(m->hdr.data_len) > 0) {
- /* initialize page iterator */
- con->out_msg_pos.page = 0;
- if (m->pages)
- con->out_msg_pos.page_pos = m->page_alignment;
- else
- con->out_msg_pos.page_pos = 0;
- con->out_msg_pos.data_pos = 0;
- con->out_msg_pos.did_page_crc = false;
- con->out_more = 1; /* data + footer will follow */
- } else {
+ con->out_msg->footer.data_crc = 0;
+ if (m->hdr.data_len)
+ prepare_write_message_data(con);
+ else
/* no, queue up footer too and be done */
prepare_write_message_footer(con);
- }
- set_bit(WRITE_PENDING, &con->state);
+ set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
}
/*
@@ -630,16 +809,16 @@ static void prepare_write_ack(struct ceph_connection *con)
con->in_seq_acked, con->in_seq);
con->in_seq_acked = con->in_seq;
- ceph_con_out_kvec_reset(con);
+ con_out_kvec_reset(con);
- ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
+ con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
- ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack),
+ con_out_kvec_add(con, sizeof (con->out_temp_ack),
&con->out_temp_ack);
con->out_more = 1; /* more will follow.. eventually.. */
- set_bit(WRITE_PENDING, &con->state);
+ set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
}
/*
@@ -648,9 +827,9 @@ static void prepare_write_ack(struct ceph_connection *con)
static void prepare_write_keepalive(struct ceph_connection *con)
{
dout("prepare_write_keepalive %p\n", con);
- ceph_con_out_kvec_reset(con);
- ceph_con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
- set_bit(WRITE_PENDING, &con->state);
+ con_out_kvec_reset(con);
+ con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
+ set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
}
/*
@@ -665,27 +844,21 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection
if (!con->ops->get_authorizer) {
con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
con->out_connect.authorizer_len = 0;
-
return NULL;
}
/* Can't hold the mutex while getting authorizer */
-
mutex_unlock(&con->mutex);
-
auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry);
-
mutex_lock(&con->mutex);
if (IS_ERR(auth))
return auth;
- if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state))
+ if (con->state != CON_STATE_NEGOTIATING)
return ERR_PTR(-EAGAIN);
con->auth_reply_buf = auth->authorizer_reply_buf;
con->auth_reply_buf_len = auth->authorizer_reply_buf_len;
-
-
return auth;
}
@@ -694,12 +867,12 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection
*/
static void prepare_write_banner(struct ceph_connection *con)
{
- ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
- ceph_con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
+ con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
+ con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
&con->msgr->my_enc_addr);
con->out_more = 0;
- set_bit(WRITE_PENDING, &con->state);
+ set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
}
static int prepare_write_connect(struct ceph_connection *con)
@@ -742,14 +915,15 @@ static int prepare_write_connect(struct ceph_connection *con)
con->out_connect.authorizer_len = auth ?
cpu_to_le32(auth->authorizer_buf_len) : 0;
- ceph_con_out_kvec_add(con, sizeof (con->out_connect),
+ con_out_kvec_reset(con);
+ con_out_kvec_add(con, sizeof (con->out_connect),
&con->out_connect);
if (auth && auth->authorizer_buf_len)
- ceph_con_out_kvec_add(con, auth->authorizer_buf_len,
+ con_out_kvec_add(con, auth->authorizer_buf_len,
auth->authorizer_buf);
con->out_more = 0;
- set_bit(WRITE_PENDING, &con->state);
+ set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
return 0;
}
@@ -797,30 +971,34 @@ out:
return ret; /* done! */
}
-#ifdef CONFIG_BLOCK
-static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
+static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
+ size_t len, size_t sent, bool in_trail)
{
- if (!bio) {
- *iter = NULL;
- *seg = 0;
- return;
- }
- *iter = bio;
- *seg = bio->bi_idx;
-}
+ struct ceph_msg *msg = con->out_msg;
-static void iter_bio_next(struct bio **bio_iter, int *seg)
-{
- if (*bio_iter == NULL)
- return;
+ BUG_ON(!msg);
+ BUG_ON(!sent);
- BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
+ con->out_msg_pos.data_pos += sent;
+ con->out_msg_pos.page_pos += sent;
+ if (sent < len)
+ return;
- (*seg)++;
- if (*seg == (*bio_iter)->bi_vcnt)
- init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
-}
+ BUG_ON(sent != len);
+ con->out_msg_pos.page_pos = 0;
+ con->out_msg_pos.page++;
+ con->out_msg_pos.did_page_crc = false;
+ if (in_trail)
+ list_move_tail(&page->lru,
+ &msg->trail->head);
+ else if (msg->pagelist)
+ list_move_tail(&page->lru,
+ &msg->pagelist->head);
+#ifdef CONFIG_BLOCK
+ else if (msg->bio)
+ iter_bio_next(&msg->bio_iter, &msg->bio_seg);
#endif
+}
/*
* Write as much message data payload as we can. If we finish, queue
@@ -837,41 +1015,36 @@ static int write_partial_msg_pages(struct ceph_connection *con)
bool do_datacrc = !con->msgr->nocrc;
int ret;
int total_max_write;
- int in_trail = 0;
- size_t trail_len = (msg->trail ? msg->trail->length : 0);
+ bool in_trail = false;
+ const size_t trail_len = (msg->trail ? msg->trail->length : 0);
+ const size_t trail_off = data_len - trail_len;
dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
- con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
+ con, msg, con->out_msg_pos.page, msg->nr_pages,
con->out_msg_pos.page_pos);
-#ifdef CONFIG_BLOCK
- if (msg->bio && !msg->bio_iter)
- init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
-#endif
-
+ /*
+ * Iterate through each page that contains data to be
+ * written, and send as much as possible for each.
+ *
+ * If we are calculating the data crc (the default), we will
+ * need to map the page. If we have no pages, they have
+ * been revoked, so use the zero page.
+ */
while (data_len > con->out_msg_pos.data_pos) {
struct page *page = NULL;
int max_write = PAGE_SIZE;
int bio_offset = 0;
- total_max_write = data_len - trail_len -
- con->out_msg_pos.data_pos;
-
- /*
- * if we are calculating the data crc (the default), we need
- * to map the page. if our pages[] has been revoked, use the
- * zero page.
- */
-
- /* have we reached the trail part of the data? */
- if (con->out_msg_pos.data_pos >= data_len - trail_len) {
- in_trail = 1;
+ in_trail = in_trail || con->out_msg_pos.data_pos >= trail_off;
+ if (!in_trail)
+ total_max_write = trail_off - con->out_msg_pos.data_pos;
+ if (in_trail) {
total_max_write = data_len - con->out_msg_pos.data_pos;
page = list_first_entry(&msg->trail->head,
struct page, lru);
- max_write = PAGE_SIZE;
} else if (msg->pages) {
page = msg->pages[con->out_msg_pos.page];
} else if (msg->pagelist) {
@@ -894,15 +1067,14 @@ static int write_partial_msg_pages(struct ceph_connection *con)
if (do_datacrc && !con->out_msg_pos.did_page_crc) {
void *base;
- u32 crc;
- u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
+ u32 crc = le32_to_cpu(msg->footer.data_crc);
char *kaddr;
kaddr = kmap(page);
BUG_ON(kaddr == NULL);
base = kaddr + con->out_msg_pos.page_pos + bio_offset;
- crc = crc32c(tmpcrc, base, len);
- con->out_msg->footer.data_crc = cpu_to_le32(crc);
+ crc = crc32c(crc, base, len);
+ msg->footer.data_crc = cpu_to_le32(crc);
con->out_msg_pos.did_page_crc = true;
}
ret = ceph_tcp_sendpage(con->sock, page,
@@ -915,31 +1087,15 @@ static int write_partial_msg_pages(struct ceph_connection *con)
if (ret <= 0)
goto out;
- con->out_msg_pos.data_pos += ret;
- con->out_msg_pos.page_pos += ret;
- if (ret == len) {
- con->out_msg_pos.page_pos = 0;
- con->out_msg_pos.page++;
- con->out_msg_pos.did_page_crc = false;
- if (in_trail)
- list_move_tail(&page->lru,
- &msg->trail->head);
- else if (msg->pagelist)
- list_move_tail(&page->lru,
- &msg->pagelist->head);
-#ifdef CONFIG_BLOCK
- else if (msg->bio)
- iter_bio_next(&msg->bio_iter, &msg->bio_seg);
-#endif
- }
+ out_msg_pos_next(con, page, len, (size_t) ret, in_trail);
}
dout("write_partial_msg_pages %p msg %p done\n", con, msg);
/* prepare and queue up footer, too */
if (!do_datacrc)
- con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
- ceph_con_out_kvec_reset(con);
+ msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
+ con_out_kvec_reset(con);
prepare_write_message_footer(con);
ret = 1;
out:
@@ -1351,20 +1507,14 @@ static int process_banner(struct ceph_connection *con)
ceph_pr_addr(&con->msgr->inst.addr.in_addr));
}
- set_bit(NEGOTIATING, &con->state);<