Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
[~shefty/rdma-dev.git] / net / ceph / messenger.c
index 10255e81be79d84c5428504c2a79be3180626f79..b9796750034afc093c172dc9ac92b2e864d87271 100644 (file)
  * the sender.
  */
 
+/*
+ * We track the state of the socket on a given connection using
+ * values defined below.  The transition to a new socket state is
+ * handled by a function which verifies we aren't coming from an
+ * unexpected state.
+ *
+ *      --------
+ *      | NEW* |  transient initial state
+ *      --------
+ *          | con_sock_state_init()
+ *          v
+ *      ----------
+ *      | CLOSED |  initialized, but no socket (and no
+ *      ----------  TCP connection)
+ *       ^      \
+ *       |       \ con_sock_state_connecting()
+ *       |        ----------------------
+ *       |                              \
+ *       + con_sock_state_closed()       \
+ *       |+---------------------------    \
+ *       | \                          \    \
+ *       |  -----------                \    \
+ *       |  | CLOSING |  socket event;  \    \
+ *       |  -----------  await close     \    \
+ *       |       ^                        \   |
+ *       |       |                         \  |
+ *       |       + con_sock_state_closing() \ |
+ *       |      / \                         | |
+ *       |     /   ---------------          | |
+ *       |    /                   \         v v
+ *       |   /                    --------------
+ *       |  /    -----------------| CONNECTING |  socket created, TCP
+ *       |  |   /                 --------------  connect initiated
+ *       |  |   | con_sock_state_connected()
+ *       |  |   v
+ *      -------------
+ *      | CONNECTED |  TCP connection established
+ *      -------------
+ *
+ * State values for ceph_connection->sock_state; NEW is assumed to be 0.
+ */
+
+#define CON_SOCK_STATE_NEW             0       /* -> CLOSED */
+#define CON_SOCK_STATE_CLOSED          1       /* -> CONNECTING */
+#define CON_SOCK_STATE_CONNECTING      2       /* -> CONNECTED or -> CLOSING */
+#define CON_SOCK_STATE_CONNECTED       3       /* -> CLOSING or -> CLOSED */
+#define CON_SOCK_STATE_CLOSING         4       /* -> CLOSED */
+
+/*
+ * connection states
+ */
+#define CON_STATE_CLOSED        1  /* -> PREOPEN */
+#define CON_STATE_PREOPEN       2  /* -> CONNECTING, CLOSED */
+#define CON_STATE_CONNECTING    3  /* -> NEGOTIATING, CLOSED */
+#define CON_STATE_NEGOTIATING   4  /* -> OPEN, CLOSED */
+#define CON_STATE_OPEN          5  /* -> STANDBY, CLOSED */
+#define CON_STATE_STANDBY       6  /* -> PREOPEN, CLOSED */
+
+/*
+ * ceph_connection flag bits
+ */
+#define CON_FLAG_LOSSYTX           0  /* we can close channel or drop
+                                      * messages on errors */
+#define CON_FLAG_KEEPALIVE_PENDING 1  /* we need to send a keepalive */
+#define CON_FLAG_WRITE_PENDING    2  /* we have data ready to send */
+#define CON_FLAG_SOCK_CLOSED      3  /* socket state changed to closed */
+#define CON_FLAG_BACKOFF           4  /* need to retry queuing delayed work */
+
 /* static tag bytes (protocol control messages) */
 static char tag_msg = CEPH_MSGR_TAG_MSG;
 static char tag_ack = CEPH_MSGR_TAG_ACK;
@@ -147,72 +215,130 @@ void ceph_msgr_flush(void)
 }
 EXPORT_SYMBOL(ceph_msgr_flush);
 
+/* Connection socket state transition functions */
+
+static void con_sock_state_init(struct ceph_connection *con)
+{
+       int old_state;
+
+       old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
+       if (WARN_ON(old_state != CON_SOCK_STATE_NEW))
+               printk("%s: unexpected old state %d\n", __func__, old_state);
+       dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
+            CON_SOCK_STATE_CLOSED);
+}
+
+static void con_sock_state_connecting(struct ceph_connection *con)
+{
+       int old_state;
+
+       old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING);
+       if (WARN_ON(old_state != CON_SOCK_STATE_CLOSED))
+               printk("%s: unexpected old state %d\n", __func__, old_state);
+       dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
+            CON_SOCK_STATE_CONNECTING);
+}
+
+static void con_sock_state_connected(struct ceph_connection *con)
+{
+       int old_state;
+
+       old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED);
+       if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING))
+               printk("%s: unexpected old state %d\n", __func__, old_state);
+       dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
+            CON_SOCK_STATE_CONNECTED);
+}
+
+static void con_sock_state_closing(struct ceph_connection *con)
+{
+       int old_state;
+
+       old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING);
+       if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING &&
+                       old_state != CON_SOCK_STATE_CONNECTED &&
+                       old_state != CON_SOCK_STATE_CLOSING))
+               printk("%s: unexpected old state %d\n", __func__, old_state);
+       dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
+            CON_SOCK_STATE_CLOSING);
+}
+
+static void con_sock_state_closed(struct ceph_connection *con)
+{
+       int old_state;
+
+       old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
+       if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED &&
+                   old_state != CON_SOCK_STATE_CLOSING &&
+                   old_state != CON_SOCK_STATE_CONNECTING &&
+                   old_state != CON_SOCK_STATE_CLOSED))
+               printk("%s: unexpected old state %d\n", __func__, old_state);
+       dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
+            CON_SOCK_STATE_CLOSED);
+}
 
 /*
  * socket callback functions
  */
 
 /* data available on socket, or listen socket received a connect */
-static void ceph_data_ready(struct sock *sk, int count_unused)
+static void ceph_sock_data_ready(struct sock *sk, int count_unused)
 {
        struct ceph_connection *con = sk->sk_user_data;
+       if (atomic_read(&con->msgr->stopping)) {
+               return;
+       }
 
        if (sk->sk_state != TCP_CLOSE_WAIT) {
-               dout("ceph_data_ready on %p state = %lu, queueing work\n",
+               dout("%s on %p state = %lu, queueing work\n", __func__,
                     con, con->state);
                queue_con(con);
        }
 }
 
 /* socket has buffer space for writing */
-static void ceph_write_space(struct sock *sk)
+static void ceph_sock_write_space(struct sock *sk)
 {
        struct ceph_connection *con = sk->sk_user_data;
 
        /* only queue to workqueue if there is data we want to write,
         * and there is sufficient space in the socket buffer to accept
-        * more data.  clear SOCK_NOSPACE so that ceph_write_space()
+        * more data.  clear SOCK_NOSPACE so that ceph_sock_write_space()
         * doesn't get called again until try_write() fills the socket
         * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
         * and net/core/stream.c:sk_stream_write_space().
         */
-       if (test_bit(WRITE_PENDING, &con->state)) {
+       if (test_bit(CON_FLAG_WRITE_PENDING, &con->flags)) {
                if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
-                       dout("ceph_write_space %p queueing write work\n", con);
+                       dout("%s %p queueing write work\n", __func__, con);
                        clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
                        queue_con(con);
                }
        } else {
-               dout("ceph_write_space %p nothing to write\n", con);
+               dout("%s %p nothing to write\n", __func__, con);
        }
 }
 
 /* socket's state has changed */
-static void ceph_state_change(struct sock *sk)
+static void ceph_sock_state_change(struct sock *sk)
 {
        struct ceph_connection *con = sk->sk_user_data;
 
-       dout("ceph_state_change %p state = %lu sk_state = %u\n",
+       dout("%s %p state = %lu sk_state = %u\n", __func__,
             con, con->state, sk->sk_state);
 
-       if (test_bit(CLOSED, &con->state))
-               return;
-
        switch (sk->sk_state) {
        case TCP_CLOSE:
-               dout("ceph_state_change TCP_CLOSE\n");
+               dout("%s TCP_CLOSE\n", __func__);
        case TCP_CLOSE_WAIT:
-               dout("ceph_state_change TCP_CLOSE_WAIT\n");
-               if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) {
-                       if (test_bit(CONNECTING, &con->state))
-                               con->error_msg = "connection failed";
-                       else
-                               con->error_msg = "socket closed";
-                       queue_con(con);
-               }
+               dout("%s TCP_CLOSE_WAIT\n", __func__);
+               con_sock_state_closing(con);
+               set_bit(CON_FLAG_SOCK_CLOSED, &con->flags);
+               queue_con(con);
                break;
        case TCP_ESTABLISHED:
-               dout("ceph_state_change TCP_ESTABLISHED\n");
+               dout("%s TCP_ESTABLISHED\n", __func__);
+               con_sock_state_connected(con);
                queue_con(con);
                break;
        default:        /* Everything else is uninteresting */
@@ -228,9 +354,9 @@ static void set_sock_callbacks(struct socket *sock,
 {
        struct sock *sk = sock->sk;
        sk->sk_user_data = con;
-       sk->sk_data_ready = ceph_data_ready;
-       sk->sk_write_space = ceph_write_space;
-       sk->sk_state_change = ceph_state_change;
+       sk->sk_data_ready = ceph_sock_data_ready;
+       sk->sk_write_space = ceph_sock_write_space;
+       sk->sk_state_change = ceph_sock_state_change;
 }
 
 
@@ -262,6 +388,7 @@ static int ceph_tcp_connect(struct ceph_connection *con)
 
        dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr));
 
+       con_sock_state_connecting(con);
        ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr),
                                 O_NONBLOCK);
        if (ret == -EINPROGRESS) {
@@ -277,7 +404,6 @@ static int ceph_tcp_connect(struct ceph_connection *con)
                return ret;
        }
        con->sock = sock;
-
        return 0;
 }
 
@@ -333,16 +459,24 @@ static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
  */
 static int con_close_socket(struct ceph_connection *con)
 {
-       int rc;
+       int rc = 0;
 
        dout("con_close_socket on %p sock %p\n", con, con->sock);
-       if (!con->sock)
-               return 0;
-       set_bit(SOCK_CLOSED, &con->state);
-       rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
-       sock_release(con->sock);
-       con->sock = NULL;
-       clear_bit(SOCK_CLOSED, &con->state);
+       if (con->sock) {
+               rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
+               sock_release(con->sock);
+               con->sock = NULL;
+       }
+
+       /*
+        * Forcibly clear the SOCK_CLOSED flag.  It gets set
+        * independent of the connection mutex, and we could have
+        * received a socket close event before we had the chance to
+        * shut the socket down.
+        */
+       clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags);
+
+       con_sock_state_closed(con);
        return rc;
 }
 
@@ -353,6 +487,10 @@ static int con_close_socket(struct ceph_connection *con)
 static void ceph_msg_remove(struct ceph_msg *msg)
 {
        list_del_init(&msg->list_head);
+       BUG_ON(msg->con == NULL);
+       msg->con->ops->put(msg->con);
+       msg->con = NULL;
+
        ceph_msg_put(msg);
 }
 static void ceph_msg_remove_list(struct list_head *head)
@@ -372,8 +510,11 @@ static void reset_connection(struct ceph_connection *con)
        ceph_msg_remove_list(&con->out_sent);
 
        if (con->in_msg) {
+               BUG_ON(con->in_msg->con != con);
+               con->in_msg->con = NULL;
                ceph_msg_put(con->in_msg);
                con->in_msg = NULL;
+               con->ops->put(con);
        }
 
        con->connect_seq = 0;
@@ -391,32 +532,44 @@ static void reset_connection(struct ceph_connection *con)
  */
 void ceph_con_close(struct ceph_connection *con)
 {
+       mutex_lock(&con->mutex);
        dout("con_close %p peer %s\n", con,
             ceph_pr_addr(&con->peer_addr.in_addr));
-       set_bit(CLOSED, &con->state);  /* in case there's queued work */
-       clear_bit(STANDBY, &con->state);  /* avoid connect_seq bump */
-       clear_bit(LOSSYTX, &con->state);  /* so we retry next connect */
-       clear_bit(KEEPALIVE_PENDING, &con->state);
-       clear_bit(WRITE_PENDING, &con->state);
-       mutex_lock(&con->mutex);
+       con->state = CON_STATE_CLOSED;
+
+       clear_bit(CON_FLAG_LOSSYTX, &con->flags); /* so we retry next connect */
+       clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags);
+       clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
+       clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags);
+       clear_bit(CON_FLAG_BACKOFF, &con->flags);
+
        reset_connection(con);
        con->peer_global_seq = 0;
        cancel_delayed_work(&con->work);
+       con_close_socket(con);
        mutex_unlock(&con->mutex);
-       queue_con(con);
 }
 EXPORT_SYMBOL(ceph_con_close);
 
 /*
  * Reopen a closed connection, with a new peer address.
  */
-void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr)
+void ceph_con_open(struct ceph_connection *con,
+                  __u8 entity_type, __u64 entity_num,
+                  struct ceph_entity_addr *addr)
 {
+       mutex_lock(&con->mutex);
        dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr));
-       set_bit(OPENING, &con->state);
-       clear_bit(CLOSED, &con->state);
+
+       BUG_ON(con->state != CON_STATE_CLOSED);
+       con->state = CON_STATE_PREOPEN;
+
+       con->peer_name.type = (__u8) entity_type;
+       con->peer_name.num = cpu_to_le64(entity_num);
+
        memcpy(&con->peer_addr, addr, sizeof(*addr));
        con->delay = 0;      /* reset backoff memory */
+       mutex_unlock(&con->mutex);
        queue_con(con);
 }
 EXPORT_SYMBOL(ceph_con_open);
@@ -429,43 +582,27 @@ bool ceph_con_opened(struct ceph_connection *con)
        return con->connect_seq > 0;
 }
 
-/*
- * generic get/put
- */
-struct ceph_connection *ceph_con_get(struct ceph_connection *con)
-{
-       int nref = __atomic_add_unless(&con->nref, 1, 0);
-
-       dout("con_get %p nref = %d -> %d\n", con, nref, nref + 1);
-
-       return nref ? con : NULL;
-}
-
-void ceph_con_put(struct ceph_connection *con)
-{
-       int nref = atomic_dec_return(&con->nref);
-
-       BUG_ON(nref < 0);
-       if (nref == 0) {
-               BUG_ON(con->sock);
-               kfree(con);
-       }
-       dout("con_put %p nref = %d -> %d\n", con, nref + 1, nref);
-}
-
 /*
  * initialize a new connection.
  */
-void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con)
+void ceph_con_init(struct ceph_connection *con, void *private,
+       const struct ceph_connection_operations *ops,
+       struct ceph_messenger *msgr)
 {
        dout("con_init %p\n", con);
        memset(con, 0, sizeof(*con));
-       atomic_set(&con->nref, 1);
+       con->private = private;
+       con->ops = ops;
        con->msgr = msgr;
+
+       con_sock_state_init(con);
+
        mutex_init(&con->mutex);
        INIT_LIST_HEAD(&con->out_queue);
        INIT_LIST_HEAD(&con->out_sent);
        INIT_DELAYED_WORK(&con->work, con_work);
+
+       con->state = CON_STATE_CLOSED;
 }
 EXPORT_SYMBOL(ceph_con_init);
 
@@ -486,14 +623,14 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
        return ret;
 }
 
-static void ceph_con_out_kvec_reset(struct ceph_connection *con)
+static void con_out_kvec_reset(struct ceph_connection *con)
 {
        con->out_kvec_left = 0;
        con->out_kvec_bytes = 0;
        con->out_kvec_cur = &con->out_kvec[0];
 }
 
-static void ceph_con_out_kvec_add(struct ceph_connection *con,
+static void con_out_kvec_add(struct ceph_connection *con,
                                size_t size, void *data)
 {
        int index;
@@ -507,6 +644,53 @@ static void ceph_con_out_kvec_add(struct ceph_connection *con,
        con->out_kvec_bytes += size;
 }
 
+#ifdef CONFIG_BLOCK
+static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
+{
+       if (!bio) {
+               *iter = NULL;
+               *seg = 0;
+               return;
+       }
+       *iter = bio;
+       *seg = bio->bi_idx;
+}
+
+static void iter_bio_next(struct bio **bio_iter, int *seg)
+{
+       if (*bio_iter == NULL)
+               return;
+
+       BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
+
+       (*seg)++;
+       if (*seg == (*bio_iter)->bi_vcnt)
+               init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
+}
+#endif
+
+static void prepare_write_message_data(struct ceph_connection *con)
+{
+       struct ceph_msg *msg = con->out_msg;
+
+       BUG_ON(!msg);
+       BUG_ON(!msg->hdr.data_len);
+
+       /* initialize page iterator */
+       con->out_msg_pos.page = 0;
+       if (msg->pages)
+               con->out_msg_pos.page_pos = msg->page_alignment;
+       else
+               con->out_msg_pos.page_pos = 0;
+#ifdef CONFIG_BLOCK
+       if (msg->bio)
+               init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
+#endif
+       con->out_msg_pos.data_pos = 0;
+       con->out_msg_pos.did_page_crc = false;
+       con->out_more = 1;  /* data + footer will follow */
+}
+
 /*
  * Prepare footer for currently outgoing message, and finish things
  * off.  Assumes out_kvec* are already valid.. we just add on to the end.
@@ -516,6 +700,8 @@ static void prepare_write_message_footer(struct ceph_connection *con)
        struct ceph_msg *m = con->out_msg;
        int v = con->out_kvec_left;
 
+       m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
+
        dout("prepare_write_message_footer %p\n", con);
        con->out_kvec_is_msg = true;
        con->out_kvec[v].iov_base = &m->footer;
@@ -534,7 +720,7 @@ static void prepare_write_message(struct ceph_connection *con)
        struct ceph_msg *m;
        u32 crc;
 
-       ceph_con_out_kvec_reset(con);
+       con_out_kvec_reset(con);
        con->out_kvec_is_msg = true;
        con->out_msg_done = false;
 
@@ -542,14 +728,16 @@ static void prepare_write_message(struct ceph_connection *con)
         * TCP packet that's a good thing. */
        if (con->in_seq > con->in_seq_acked) {
                con->in_seq_acked = con->in_seq;
-               ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
+               con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
                con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
-               ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack),
+               con_out_kvec_add(con, sizeof (con->out_temp_ack),
                        &con->out_temp_ack);
        }
 
+       BUG_ON(list_empty(&con->out_queue));
        m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
        con->out_msg = m;
+       BUG_ON(m->con != con);
 
        /* put message on sent list */
        ceph_msg_get(m);
@@ -576,18 +764,18 @@ static void prepare_write_message(struct ceph_connection *con)
        BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
 
        /* tag + hdr + front + middle */
-       ceph_con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
-       ceph_con_out_kvec_add(con, sizeof (m->hdr), &m->hdr);
-       ceph_con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
+       con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
+       con_out_kvec_add(con, sizeof (m->hdr), &m->hdr);
+       con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
 
        if (m->middle)
-               ceph_con_out_kvec_add(con, m->middle->vec.iov_len,
+               con_out_kvec_add(con, m->middle->vec.iov_len,
                        m->middle->vec.iov_base);
 
        /* fill in crc (except data pages), footer */
        crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
        con->out_msg->hdr.crc = cpu_to_le32(crc);
-       con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE;
+       con->out_msg->footer.flags = 0;
 
        crc = crc32c(0, m->front.iov_base, m->front.iov_len);
        con->out_msg->footer.front_crc = cpu_to_le32(crc);
@@ -597,28 +785,19 @@ static void prepare_write_message(struct ceph_connection *con)
                con->out_msg->footer.middle_crc = cpu_to_le32(crc);
        } else
                con->out_msg->footer.middle_crc = 0;
-       con->out_msg->footer.data_crc = 0;
-       dout("prepare_write_message front_crc %u data_crc %u\n",
+       dout("%s front_crc %u middle_crc %u\n", __func__,
             le32_to_cpu(con->out_msg->footer.front_crc),
             le32_to_cpu(con->out_msg->footer.middle_crc));
 
        /* is there a data payload? */
-       if (le32_to_cpu(m->hdr.data_len) > 0) {
-               /* initialize page iterator */
-               con->out_msg_pos.page = 0;
-               if (m->pages)
-                       con->out_msg_pos.page_pos = m->page_alignment;
-               else
-                       con->out_msg_pos.page_pos = 0;
-               con->out_msg_pos.data_pos = 0;
-               con->out_msg_pos.did_page_crc = false;
-               con->out_more = 1;  /* data + footer will follow */
-       } else {
+       con->out_msg->footer.data_crc = 0;
+       if (m->hdr.data_len)
+               prepare_write_message_data(con);
+       else
                /* no, queue up footer too and be done */
                prepare_write_message_footer(con);
-       }
 
-       set_bit(WRITE_PENDING, &con->state);
+       set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
 }
 
 /*
@@ -630,16 +809,16 @@ static void prepare_write_ack(struct ceph_connection *con)
             con->in_seq_acked, con->in_seq);
        con->in_seq_acked = con->in_seq;
 
-       ceph_con_out_kvec_reset(con);
+       con_out_kvec_reset(con);
 
-       ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
+       con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
 
        con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
-       ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack),
+       con_out_kvec_add(con, sizeof (con->out_temp_ack),
                                &con->out_temp_ack);
 
        con->out_more = 1;  /* more will follow.. eventually.. */
-       set_bit(WRITE_PENDING, &con->state);
+       set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
 }
 
 /*
@@ -648,9 +827,9 @@ static void prepare_write_ack(struct ceph_connection *con)
 static void prepare_write_keepalive(struct ceph_connection *con)
 {
        dout("prepare_write_keepalive %p\n", con);
-       ceph_con_out_kvec_reset(con);
-       ceph_con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
-       set_bit(WRITE_PENDING, &con->state);
+       con_out_kvec_reset(con);
+       con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
+       set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
 }
 
 /*
@@ -665,27 +844,21 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection
        if (!con->ops->get_authorizer) {
                con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
                con->out_connect.authorizer_len = 0;
-
                return NULL;
        }
 
        /* Can't hold the mutex while getting authorizer */
-
        mutex_unlock(&con->mutex);
-
        auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry);
-
        mutex_lock(&con->mutex);
 
        if (IS_ERR(auth))
                return auth;
-       if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state))
+       if (con->state != CON_STATE_NEGOTIATING)
                return ERR_PTR(-EAGAIN);
 
        con->auth_reply_buf = auth->authorizer_reply_buf;
        con->auth_reply_buf_len = auth->authorizer_reply_buf_len;
-
-
        return auth;
 }
 
@@ -694,12 +867,12 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection
  */
 static void prepare_write_banner(struct ceph_connection *con)
 {
-       ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
-       ceph_con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
+       con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
+       con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
                                        &con->msgr->my_enc_addr);
 
        con->out_more = 0;
-       set_bit(WRITE_PENDING, &con->state);
+       set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
 }
 
 static int prepare_write_connect(struct ceph_connection *con)
@@ -742,14 +915,15 @@ static int prepare_write_connect(struct ceph_connection *con)
        con->out_connect.authorizer_len = auth ?
                cpu_to_le32(auth->authorizer_buf_len) : 0;
 
-       ceph_con_out_kvec_add(con, sizeof (con->out_connect),
+       con_out_kvec_reset(con);
+       con_out_kvec_add(con, sizeof (con->out_connect),
                                        &con->out_connect);
        if (auth && auth->authorizer_buf_len)
-               ceph_con_out_kvec_add(con, auth->authorizer_buf_len,
+               con_out_kvec_add(con, auth->authorizer_buf_len,
                                        auth->authorizer_buf);
 
        con->out_more = 0;
-       set_bit(WRITE_PENDING, &con->state);
+       set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
 
        return 0;
 }
@@ -797,30 +971,34 @@ out:
        return ret;  /* done! */
 }
 
-#ifdef CONFIG_BLOCK
-static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
+static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
+                       size_t len, size_t sent, bool in_trail)
 {
-       if (!bio) {
-               *iter = NULL;
-               *seg = 0;
-               return;
-       }
-       *iter = bio;
-       *seg = bio->bi_idx;
-}
+       struct ceph_msg *msg = con->out_msg;
 
-static void iter_bio_next(struct bio **bio_iter, int *seg)
-{
-       if (*bio_iter == NULL)
-               return;
+       BUG_ON(!msg);
+       BUG_ON(!sent);
 
-       BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
+       con->out_msg_pos.data_pos += sent;
+       con->out_msg_pos.page_pos += sent;
+       if (sent < len)
+               return;
 
-       (*seg)++;
-       if (*seg == (*bio_iter)->bi_vcnt)
-               init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
-}
+       BUG_ON(sent != len);
+       con->out_msg_pos.page_pos = 0;
+       con->out_msg_pos.page++;
+       con->out_msg_pos.did_page_crc = false;
+       if (in_trail)
+               list_move_tail(&page->lru,
+                              &msg->trail->head);
+       else if (msg->pagelist)
+               list_move_tail(&page->lru,
+                              &msg->pagelist->head);
+#ifdef CONFIG_BLOCK
+       else if (msg->bio)
+               iter_bio_next(&msg->bio_iter, &msg->bio_seg);
 #endif
+}
 
 /*
  * Write as much message data payload as we can.  If we finish, queue
@@ -837,41 +1015,36 @@ static int write_partial_msg_pages(struct ceph_connection *con)
        bool do_datacrc = !con->msgr->nocrc;
        int ret;
        int total_max_write;
-       int in_trail = 0;
-       size_t trail_len = (msg->trail ? msg->trail->length : 0);
+       bool in_trail = false;
+       const size_t trail_len = (msg->trail ? msg->trail->length : 0);
+       const size_t trail_off = data_len - trail_len;
 
        dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
-            con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
+            con, msg, con->out_msg_pos.page, msg->nr_pages,
             con->out_msg_pos.page_pos);
 
-#ifdef CONFIG_BLOCK
-       if (msg->bio && !msg->bio_iter)
-               init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
-#endif
-
+       /*
+        * Iterate through each page that contains data to be
+        * written, and send as much as possible for each.
+        *
+        * If we are calculating the data crc (the default), we will
+        * need to map the page.  If we have no pages, they have
+        * been revoked, so use the zero page.
+        */
        while (data_len > con->out_msg_pos.data_pos) {
                struct page *page = NULL;
                int max_write = PAGE_SIZE;
                int bio_offset = 0;
 
-               total_max_write = data_len - trail_len -
-                       con->out_msg_pos.data_pos;
-
-               /*
-                * if we are calculating the data crc (the default), we need
-                * to map the page.  if our pages[] has been revoked, use the
-                * zero page.
-                */
-
-               /* have we reached the trail part of the data? */
-               if (con->out_msg_pos.data_pos >= data_len - trail_len) {
-                       in_trail = 1;
+               in_trail = in_trail || con->out_msg_pos.data_pos >= trail_off;
+               if (!in_trail)
+                       total_max_write = trail_off - con->out_msg_pos.data_pos;
 
+               if (in_trail) {
                        total_max_write = data_len - con->out_msg_pos.data_pos;
 
                        page = list_first_entry(&msg->trail->head,
                                                struct page, lru);
-                       max_write = PAGE_SIZE;
                } else if (msg->pages) {
                        page = msg->pages[con->out_msg_pos.page];
                } else if (msg->pagelist) {
@@ -894,15 +1067,14 @@ static int write_partial_msg_pages(struct ceph_connection *con)
 
                if (do_datacrc && !con->out_msg_pos.did_page_crc) {
                        void *base;
-                       u32 crc;
-                       u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
+                       u32 crc = le32_to_cpu(msg->footer.data_crc);
                        char *kaddr;
 
                        kaddr = kmap(page);
                        BUG_ON(kaddr == NULL);
                        base = kaddr + con->out_msg_pos.page_pos + bio_offset;
-                       crc = crc32c(tmpcrc, base, len);
-                       con->out_msg->footer.data_crc = cpu_to_le32(crc);
+                       crc = crc32c(crc, base, len);
+                       msg->footer.data_crc = cpu_to_le32(crc);
                        con->out_msg_pos.did_page_crc = true;
                }
                ret = ceph_tcp_sendpage(con->sock, page,
@@ -915,31 +1087,15 @@ static int write_partial_msg_pages(struct ceph_connection *con)
                if (ret <= 0)
                        goto out;
 
-               con->out_msg_pos.data_pos += ret;
-               con->out_msg_pos.page_pos += ret;
-               if (ret == len) {
-                       con->out_msg_pos.page_pos = 0;
-                       con->out_msg_pos.page++;
-                       con->out_msg_pos.did_page_crc = false;
-                       if (in_trail)
-                               list_move_tail(&page->lru,
-                                              &msg->trail->head);
-                       else if (msg->pagelist)
-                               list_move_tail(&page->lru,
-                                              &msg->pagelist->head);
-#ifdef CONFIG_BLOCK
-                       else if (msg->bio)
-                               iter_bio_next(&msg->bio_iter, &msg->bio_seg);
-#endif
-               }
+               out_msg_pos_next(con, page, len, (size_t) ret, in_trail);
        }
 
        dout("write_partial_msg_pages %p msg %p done\n", con, msg);
 
        /* prepare and queue up footer, too */
        if (!do_datacrc)
-               con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
-       ceph_con_out_kvec_reset(con);
+               msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
+       con_out_kvec_reset(con);
        prepare_write_message_footer(con);
        ret = 1;
 out:
@@ -1351,20 +1507,14 @@ static int process_banner(struct ceph_connection *con)
                     ceph_pr_addr(&con->msgr->inst.addr.in_addr));
        }
 
-       set_bit(NEGOTIATING, &con->state);
-       prepare_read_connect(con);
        return 0;
 }
 
 static void fail_protocol(struct ceph_connection *con)
 {
        reset_connection(con);
-       set_bit(CLOSED, &con->state);  /* in case there's queued work */
-
-       mutex_unlock(&con->mutex);
-       if (con->ops->bad_proto)
-               con->ops->bad_proto(con);
-       mutex_lock(&con->mutex);
+       BUG_ON(con->state != CON_STATE_NEGOTIATING);
+       con->state = CON_STATE_CLOSED;
 }
 
 static int process_connect(struct ceph_connection *con)
@@ -1407,7 +1557,6 @@ static int process_connect(struct ceph_connection *con)
                        return -1;
                }
                con->auth_retry = 1;
-               ceph_con_out_kvec_reset(con);
                ret = prepare_write_connect(con);
                if (ret < 0)
                        return ret;
@@ -1428,7 +1577,6 @@ static int process_connect(struct ceph_connection *con)
                       ENTITY_NAME(con->peer_name),
                       ceph_pr_addr(&con->peer_addr.in_addr));
                reset_connection(con);
-               ceph_con_out_kvec_reset(con);
                ret = prepare_write_connect(con);
                if (ret < 0)
                        return ret;
@@ -1440,8 +1588,7 @@ static int process_connect(struct ceph_connection *con)
                if (con->ops->peer_reset)
                        con->ops->peer_reset(con);
                mutex_lock(&con->mutex);
-               if (test_bit(CLOSED, &con->state) ||
-                   test_bit(OPENING, &con->state))
+               if (con->state != CON_STATE_NEGOTIATING)
                        return -EAGAIN;
                break;
 
@@ -1454,7 +1601,6 @@ static int process_connect(struct ceph_connection *con)
                     le32_to_cpu(con->out_connect.connect_seq),
                     le32_to_cpu(con->in_reply.connect_seq));
                con->connect_seq = le32_to_cpu(con->in_reply.connect_seq);
-               ceph_con_out_kvec_reset(con);
                ret = prepare_write_connect(con);
                if (ret < 0)
                        return ret;
@@ -1471,7 +1617,6 @@ static int process_connect(struct ceph_connection *con)
                     le32_to_cpu(con->in_reply.global_seq));
                get_global_seq(con->msgr,
                               le32_to_cpu(con->in_reply.global_seq));
-               ceph_con_out_kvec_reset(con);
                ret = prepare_write_connect(con);
                if (ret < 0)
                        return ret;
@@ -1489,7 +1634,10 @@ static int process_connect(struct ceph_connection *con)
                        fail_protocol(con);
                        return -1;
                }
-               clear_bit(CONNECTING, &con->state);
+
+               BUG_ON(con->state != CON_STATE_NEGOTIATING);
+               con->state = CON_STATE_OPEN;
+
                con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
                con->connect_seq++;
                con->peer_features = server_feat;
@@ -1501,7 +1649,9 @@ static int process_connect(struct ceph_connection *con)
                        le32_to_cpu(con->in_reply.connect_seq));
 
                if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
-                       set_bit(LOSSYTX, &con->state);
+                       set_bit(CON_FLAG_LOSSYTX, &con->flags);
+
+               con->delay = 0;      /* reset backoff memory */
 
                prepare_read_tag(con);
                break;
@@ -1587,10 +1737,7 @@ static int read_partial_message_section(struct ceph_connection *con,
        return 1;
 }
 
-static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
-                               struct ceph_msg_header *hdr,
-                               int *skip);
-
+static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip);
 
 static int read_partial_message_pages(struct ceph_connection *con,
                                      struct page **pages,
@@ -1633,9 +1780,6 @@ static int read_partial_message_bio(struct ceph_connection *con,
        void *p;
        int ret, left;
 
-       if (IS_ERR(bv))
-               return PTR_ERR(bv);
-
        left = min((int)(data_len - con->in_msg_pos.data_pos),
                   (int)(bv->bv_len - con->in_msg_pos.page_pos));
 
@@ -1672,7 +1816,6 @@ static int read_partial_message(struct ceph_connection *con)
        int ret;
        unsigned int front_len, middle_len, data_len;
        bool do_datacrc = !con->msgr->nocrc;
-       int skip;
        u64 seq;
        u32 crc;
 
@@ -1723,10 +1866,13 @@ static int read_partial_message(struct ceph_connection *con)
 
        /* allocate message? */
        if (!con->in_msg) {
+               int skip = 0;
+
                dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
                     con->in_hdr.front_len, con->in_hdr.data_len);
-               skip = 0;
-               con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip);
+               ret = ceph_con_in_msg_alloc(con, &skip);
+               if (ret < 0)
+                       return ret;
                if (skip) {
                        /* skip this message */
                        dout("alloc_msg said skip message\n");
@@ -1737,11 +1883,9 @@ static int read_partial_message(struct ceph_connection *con)
                        con->in_seq++;
                        return 0;
                }
-               if (!con->in_msg) {
-                       con->error_msg =
-                               "error allocating memory for incoming message";
-                       return -ENOMEM;
-               }
+
+               BUG_ON(!con->in_msg);
+               BUG_ON(con->in_msg->con != con);
                m = con->in_msg;
                m->front.iov_len = 0;    /* haven't read it yet */
                if (m->middle)
@@ -1753,6 +1897,11 @@ static int read_partial_message(struct ceph_connection *con)
                else
                        con->in_msg_pos.page_pos = 0;
                con->in_msg_pos.data_pos = 0;
+
+#ifdef CONFIG_BLOCK
+               if (m->bio)
+                       init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
+#endif
        }
 
        /* front */
@@ -1769,10 +1918,6 @@ static int read_partial_message(struct ceph_connection *con)
                if (ret <= 0)
                        return ret;
        }
-#ifdef CONFIG_BLOCK
-       if (m->bio && !m->bio_iter)
-               init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
-#endif
 
        /* (page) data */
        while (con->in_msg_pos.data_pos < data_len) {
@@ -1783,7 +1928,7 @@ static int read_partial_message(struct ceph_connection *con)
                                return ret;
 #ifdef CONFIG_BLOCK
                } else if (m->bio) {
-
+                       BUG_ON(!m->bio_iter);
                        ret = read_partial_message_bio(con,
                                                 &m->bio_iter, &m->bio_seg,
                                                 data_len, do_datacrc);
@@ -1837,8 +1982,11 @@ static void process_message(struct ceph_connection *con)
 {
        struct ceph_msg *msg;
 
+       BUG_ON(con->in_msg->con != con);
+       con->in_msg->con = NULL;
        msg = con->in_msg;
        con->in_msg = NULL;
+       con->ops->put(con);
 
        /* if first message, set peer_name */
        if (con->peer_name.type == 0)
@@ -1858,7 +2006,6 @@ static void process_message(struct ceph_connection *con)
        con->ops->dispatch(con, msg);
 
        mutex_lock(&con->mutex);
-       prepare_read_tag(con);
 }
 
 
@@ -1870,22 +2017,19 @@ static int try_write(struct ceph_connection *con)
 {
        int ret = 1;
 
-       dout("try_write start %p state %lu nref %d\n", con, con->state,
-            atomic_read(&con->nref));
+       dout("try_write start %p state %lu\n", con, con->state);
 
 more:
        dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
 
        /* open the socket first? */
-       if (con->sock == NULL) {
-               ceph_con_out_kvec_reset(con);
+       if (con->state == CON_STATE_PREOPEN) {
+               BUG_ON(con->sock);
+               con->state = CON_STATE_CONNECTING;
+
+               con_out_kvec_reset(con);
                prepare_write_banner(con);
-               ret = prepare_write_connect(con);
-               if (ret < 0)
-                       goto out;
                prepare_read_banner(con);
-               set_bit(CONNECTING, &con->state);
-               clear_bit(NEGOTIATING, &con->state);
 
                BUG_ON(con->in_msg);
                con->in_tag = CEPH_MSGR_TAG_READY;
@@ -1932,7 +2076,7 @@ more_kvec:
        }
 
 do_next:
-       if (!test_bit(CONNECTING, &con->state)) {
+       if (con->state == CON_STATE_OPEN) {
                /* is anything else pending? */
                if (!list_empty(&con->out_queue)) {
                        prepare_write_message(con);
@@ -1942,14 +2086,15 @@ do_next:
                        prepare_write_ack(con);
                        goto more;
                }
-               if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) {
+               if (test_and_clear_bit(CON_FLAG_KEEPALIVE_PENDING,
+                                      &con->flags)) {
                        prepare_write_keepalive(con);
                        goto more;
                }
        }
 
        /* Nothing to do! */
-       clear_bit(WRITE_PENDING, &con->state);
+       clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
        dout("try_write nothing else to write.\n");
        ret = 0;
 out:
@@ -1966,38 +2111,42 @@ static int try_read(struct ceph_connection *con)
 {
        int ret = -1;
 
-       if (!con->sock)
-               return 0;
-
-       if (test_bit(STANDBY, &con->state))
+more:
+       dout("try_read start on %p state %lu\n", con, con->state);
+       if (con->state != CON_STATE_CONNECTING &&
+           con->state != CON_STATE_NEGOTIATING &&
+           con->state != CON_STATE_OPEN)
                return 0;
 
-       dout("try_read start on %p\n", con);
+       BUG_ON(!con->sock);
 
-more:
        dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
             con->in_base_pos);
 
-       /*
-        * process_connect and process_message drop and re-take
-        * con->mutex.  make sure we handle a racing close or reopen.
-        */
-       if (test_bit(CLOSED, &con->state) ||
-           test_bit(OPENING, &con->state)) {
-               ret = -EAGAIN;
+       if (con->state == CON_STATE_CONNECTING) {
+               dout("try_read connecting\n");
+               ret = read_partial_banner(con);
+               if (ret <= 0)
+                       goto out;
+               ret = process_banner(con);
+               if (ret < 0)
+                       goto out;
+
+               BUG_ON(con->state != CON_STATE_CONNECTING);
+               con->state = CON_STATE_NEGOTIATING;
+
+               /* Banner is good, exchange connection info */
+               ret = prepare_write_connect(con);
+               if (ret < 0)
+                       goto out;
+               prepare_read_connect(con);
+
+               /* Send connection info before awaiting response */
                goto out;
        }
 
-       if (test_bit(CONNECTING, &con->state)) {
-               if (!test_bit(NEGOTIATING, &con->state)) {
-                       dout("try_read connecting\n");
-                       ret = read_partial_banner(con);
-                       if (ret <= 0)
-                               goto out;
-                       ret = process_banner(con);
-                       if (ret < 0)
-                               goto out;
-               }
+       if (con->state == CON_STATE_NEGOTIATING) {
+               dout("try_read negotiating\n");
                ret = read_partial_connect(con);
                if (ret <= 0)
                        goto out;
@@ -2007,6 +2156,8 @@ more:
                goto more;
        }
 
+       BUG_ON(con->state != CON_STATE_OPEN);
+
        if (con->in_base_pos < 0) {
                /*
                 * skipping + discarding content.
@@ -2040,7 +2191,8 @@ more:
                        prepare_read_ack(con);
                        break;
                case CEPH_MSGR_TAG_CLOSE:
-                       set_bit(CLOSED, &con->state);   /* fixme */
+                       con_close_socket(con);
+                       con->state = CON_STATE_CLOSED;
                        goto out;
                default:
                        goto bad_tag;
@@ -2063,6 +2215,8 @@ more:
                if (con->in_tag == CEPH_MSGR_TAG_READY)
                        goto more;
                process_message(con);
+               if (con->state == CON_STATE_OPEN)
+                       prepare_read_tag(con);
                goto more;
        }
        if (con->in_tag == CEPH_MSGR_TAG_ACK) {
@@ -2091,12 +2245,6 @@ bad_tag:
  */
 static void queue_con(struct ceph_connection *con)
 {
-       if (test_bit(DEAD, &con->state)) {
-               dout("queue_con %p ignoring: DEAD\n",
-                    con);
-               return;
-       }
-
        if (!con->ops->get(con)) {
                dout("queue_con %p ref count 0\n", con);
                return;
@@ -2121,7 +2269,26 @@ static void con_work(struct work_struct *work)
 
        mutex_lock(&con->mutex);
 restart:
-       if (test_and_clear_bit(BACKOFF, &con->state)) {
+       if (test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags)) {
+               switch (con->state) {
+               case CON_STATE_CONNECTING:
+                       con->error_msg = "connection failed";
+                       break;
+               case CON_STATE_NEGOTIATING:
+                       con->error_msg = "negotiation failed";
+                       break;
+               case CON_STATE_OPEN:
+                       con->error_msg = "socket closed";
+                       break;
+               default:
+                       dout("unrecognized con state %d\n", (int)con->state);
+                       con->error_msg = "unrecognized con state";
+                       BUG();
+               }
+               goto fault;
+       }
+
+       if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) {
                dout("con_work %p backing off\n", con);
                if (queue_delayed_work(ceph_msgr_wq, &con->work,
                                       round_jiffies_relative(con->delay))) {
@@ -2135,35 +2302,35 @@ restart:
                }
        }
 
-       if (test_bit(STANDBY, &con->state)) {
+       if (con->state == CON_STATE_STANDBY) {
                dout("con_work %p STANDBY\n", con);
                goto done;
        }
-       if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
-               dout("con_work CLOSED\n");
-               con_close_socket(con);
+       if (con->state == CON_STATE_CLOSED) {
+               dout("con_work %p CLOSED\n", con);
+               BUG_ON(con->sock);
                goto done;
        }
-       if (test_and_clear_bit(OPENING, &con->state)) {
-               /* reopen w/ new peer */
+       if (con->state == CON_STATE_PREOPEN) {
                dout("con_work OPENING\n");
-               con_close_socket(con);
+               BUG_ON(con->sock);
        }
 
-       if (test_and_clear_bit(SOCK_CLOSED, &con->state))
-               goto fault;
-
        ret = try_read(con);
        if (ret == -EAGAIN)
                goto restart;
-       if (ret < 0)
+       if (ret < 0) {
+               con->error_msg = "socket error on read";
                goto fault;
+       }
 
        ret = try_write(con);
        if (ret == -EAGAIN)
                goto restart;
-       if (ret < 0)
+       if (ret < 0) {
+               con->error_msg = "socket error on write";
                goto fault;
+       }
 
 done:
        mutex_unlock(&con->mutex);
@@ -2172,7 +2339,6 @@ done_unlocked:
        return;
 
 fault:
-       mutex_unlock(&con->mutex);
        ceph_fault(con);     /* error/fault path */
        goto done_unlocked;
 }
@@ -2183,26 +2349,31 @@ fault:
  * exponential backoff
  */
 static void ceph_fault(struct ceph_connection *con)
+       __releases(con->mutex)
 {
        pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
               ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
        dout("fault %p state %lu to peer %s\n",
             con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
 
-       if (test_bit(LOSSYTX, &con->state)) {
-               dout("fault on LOSSYTX channel\n");
-               goto out;
-       }
-
-       mutex_lock(&con->mutex);
-       if (test_bit(CLOSED, &con->state))
-               goto out_unlock;
+       BUG_ON(con->state != CON_STATE_CONNECTING &&
+              con->state != CON_STATE_NEGOTIATING &&
+              con->state != CON_STATE_OPEN);
 
        con_close_socket(con);
 
+       if (test_bit(CON_FLAG_LOSSYTX, &con->flags)) {
+               dout("fault on LOSSYTX channel, marking CLOSED\n");
+               con->state = CON_STATE_CLOSED;
+               goto out_unlock;
+       }
+
        if (con->in_msg) {
+               BUG_ON(con->in_msg->con != con);
+               con->in_msg->con = NULL;
                ceph_msg_put(con->in_msg);
                con->in_msg = NULL;
+               con->ops->put(con);
        }
 
        /* Requeue anything that hasn't been acked */
@@ -2211,12 +2382,13 @@ static void ceph_fault(struct ceph_connection *con)
        /* If there are no messages queued or keepalive pending, place
         * the connection in a STANDBY state */
        if (list_empty(&con->out_queue) &&
-           !test_bit(KEEPALIVE_PENDING, &con->state)) {
+           !test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)) {
                dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
-               clear_bit(WRITE_PENDING, &con->state);
-               set_bit(STANDBY, &con->state);
+               clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
+               con->state = CON_STATE_STANDBY;
        } else {
                /* retry after a delay. */
+               con->state = CON_STATE_PREOPEN;
                if (con->delay == 0)
                        con->delay = BASE_DELAY_INTERVAL;
                else if (con->delay < MAX_DELAY_INTERVAL)
@@ -2237,13 +2409,12 @@ static void ceph_fault(struct ceph_connection *con)
                         * that when con_work restarts we schedule the
                         * delay then.
                         */
-                       set_bit(BACKOFF, &con->state);
+                       set_bit(CON_FLAG_BACKOFF, &con->flags);
                }
        }
 
 out_unlock:
        mutex_unlock(&con->mutex);
-out:
        /*
         * in case we faulted due to authentication, invalidate our
         * current tickets so that we can get new ones.
@@ -2260,18 +2431,14 @@ out:
 
 
 /*
- * create a new messenger instance
+ * initialize a new messenger instance
  */
-struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr,
-                                            u32 supported_features,
-                                            u32 required_features)
+void ceph_messenger_init(struct ceph_messenger *msgr,
+                       struct ceph_entity_addr *myaddr,
+                       u32 supported_features,
+                       u32 required_features,
+                       bool nocrc)
 {
-       struct ceph_messenger *msgr;
-
-       msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
-       if (msgr == NULL)
-               return ERR_PTR(-ENOMEM);
-
        msgr->supported_features = supported_features;
        msgr->required_features = required_features;
 
@@ -2284,30 +2451,23 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr,
        msgr->inst.addr.type = 0;
        get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce));
        encode_my_addr(msgr);
+       msgr->nocrc = nocrc;
 
-       dout("messenger_create %p\n", msgr);
-       return msgr;
-}
-EXPORT_SYMBOL(ceph_messenger_create);
+       atomic_set(&msgr->stopping, 0);
 
-void ceph_messenger_destroy(struct ceph_messenger *msgr)
-{
-       dout("destroy %p\n", msgr);
-       kfree(msgr);
-       dout("destroyed messenger %p\n", msgr);
+       dout("%s %p\n", __func__, msgr);
 }
-EXPORT_SYMBOL(ceph_messenger_destroy);
+EXPORT_SYMBOL(ceph_messenger_init);
 
 static void clear_standby(struct ceph_connection *con)
 {
        /* come back from STANDBY? */
-       if (test_and_clear_bit(STANDBY, &con->state)) {
-               mutex_lock(&con->mutex);
+       if (con->state == CON_STATE_STANDBY) {
                dout("clear_standby %p and ++connect_seq\n", con);
+               con->state = CON_STATE_PREOPEN;
                con->connect_seq++;
-               WARN_ON(test_bit(WRITE_PENDING, &con->state));
-               WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state));
-               mutex_unlock(&con->mutex);
+               WARN_ON(test_bit(CON_FLAG_WRITE_PENDING, &con->flags));
+               WARN_ON(test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags));
        }
 }
 
@@ -2316,21 +2476,24 @@ static void clear_standby(struct ceph_connection *con)
  */
 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
 {
-       if (test_bit(CLOSED, &con->state)) {
-               dout("con_send %p closed, dropping %p\n", con, msg);
-               ceph_msg_put(msg);
-               return;
-       }
-
        /* set src+dst */
        msg->hdr.src = con->msgr->inst.name;
-
        BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
-
        msg->needs_out_seq = true;
 
-       /* queue */
        mutex_lock(&con->mutex);
+
+       if (con->state == CON_STATE_CLOSED) {
+               dout("con_send %p closed, dropping %p\n", con, msg);
+               ceph_msg_put(msg);
+               mutex_unlock(&con->mutex);
+               return;
+       }
+
+       BUG_ON(msg->con != NULL);
+       msg->con = con->ops->get(con);
+       BUG_ON(msg->con == NULL);
+
        BUG_ON(!list_empty(&msg->list_head));
        list_add_tail(&msg->list_head, &con->out_queue);
        dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
@@ -2339,12 +2502,13 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
             le32_to_cpu(msg->hdr.front_len),
             le32_to_cpu(msg->hdr.middle_len),
             le32_to_cpu(msg->hdr.data_len));
+
+       clear_standby(con);
        mutex_unlock(&con->mutex);
 
        /* if there wasn't anything waiting to send before, queue
         * new work */
-       clear_standby(con);
-       if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
+       if (test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0)
                queue_con(con);
 }
 EXPORT_SYMBOL(ceph_con_send);
@@ -2352,24 +2516,34 @@ EXPORT_SYMBOL(ceph_con_send);
 /*
  * Revoke a message that was previously queued for send
  */
-void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
+void ceph_msg_revoke(struct ceph_msg *msg)
 {
+       struct ceph_connection *con = msg->con;
+
+       if (!con)
+               return;         /* Message not in our possession */
+
        mutex_lock(&con->mutex);
        if (!list_empty(&msg->list_head)) {
-               dout("con_revoke %p msg %p - was on queue\n", con, msg);
+               dout("%s %p msg %p - was on queue\n", __func__, con, msg);
                list_del_init(&msg->list_head);
-               ceph_msg_put(msg);
+               BUG_ON(msg->con == NULL);
+               msg->con->ops->put(msg->con);
+               msg->con = NULL;
                msg->hdr.seq = 0;
+
+               ceph_msg_put(msg);
        }
        if (con->out_msg == msg) {
-               dout("con_revoke %p msg %p - was sending\n", con, msg);
+               dout("%s %p msg %p - was sending\n", __func__, con, msg);
                con->out_msg = NULL;
                if (con->out_kvec_is_msg) {
                        con->out_skip = con->out_kvec_bytes;
                        con->out_kvec_is_msg = false;
                }
-               ceph_msg_put(msg);
                msg->hdr.seq = 0;
+
+               ceph_msg_put(msg);
        }
        mutex_unlock(&con->mutex);
 }
@@ -2377,17 +2551,27 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
 /*
  * Revoke a message that we may be reading data into
  */
-void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
+void ceph_msg_revoke_incoming(struct ceph_msg *msg)
 {
+       struct ceph_connection *con;
+
+       BUG_ON(msg == NULL);
+       if (!msg->con) {
+               dout("%s msg %p null con\n", __func__, msg);
+
+               return;         /* Message not in our possession */
+       }
+
+       con = msg->con;
        mutex_lock(&con->mutex);
-       if (con->in_msg && con->in_msg == msg) {
+       if (con->in_msg == msg) {
                unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
                unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
                unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
 
                /* skip rest of message */
-               dout("con_revoke_pages %p msg %p revoked\n", con, msg);
-                       con->in_base_pos = con->in_base_pos -
+               dout("%s %p msg %p revoked\n", __func__, con, msg);
+               con->in_base_pos = con->in_base_pos -
                                sizeof(struct ceph_msg_header) -
                                front_len -
                                middle_len -
@@ -2398,8 +2582,8 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
                con->in_tag = CEPH_MSGR_TAG_READY;
                con->in_seq++;
        } else {
-               dout("con_revoke_pages %p msg %p pages %p no-op\n",
-                    con, con->in_msg, msg);
+               dout("%s %p in_msg %p msg %p no-op\n",
+                    __func__, con, con->in_msg, msg);
        }
        mutex_unlock(&con->mutex);
 }
@@ -2410,9 +2594,11 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
 void ceph_con_keepalive(struct ceph_connection *con)
 {
        dout("con_keepalive %p\n", con);
+       mutex_lock(&con->mutex);
        clear_standby(con);
-       if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 &&
-           test_and_set_bit(WRITE_PENDING, &con->state) == 0)
+       mutex_unlock(&con->mutex);
+       if (test_and_set_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags) == 0 &&
+           test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0)
                queue_con(con);
 }
 EXPORT_SYMBOL(ceph_con_keepalive);
@@ -2431,6 +2617,8 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
        if (m == NULL)
                goto out;
        kref_init(&m->kref);
+
+       m->con = NULL;
        INIT_LIST_HEAD(&m->list_head);
 
        m->hdr.tid = 0;
@@ -2526,46 +2714,77 @@ static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
 }
 
 /*
- * Generic message allocator, for incoming messages.
+ * Allocate a message for receiving an incoming message on a
+ * connection, and save the result in con->in_msg.  Uses the
+ * connection's private alloc_msg op if available.
+ *
+ * Returns 0 on success, or a negative error code.
+ *
+ * On success, if we set *skip = 1:
+ *  - the next message should be skipped and ignored.
+ *  - con->in_msg == NULL
+ * or if we set *skip = 0:
+ *  - con->in_msg is non-null.
+ * On error (ENOMEM, EAGAIN, ...),
+ *  - con->in_msg == NULL
  */
-static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
-                               struct ceph_msg_header *hdr,
-                               int *skip)
+static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
 {
+       struct ceph_msg_header *hdr = &con->in_hdr;
        int type = le16_to_cpu(hdr->type);
        int front_len = le32_to_cpu(hdr->front_len);
        int middle_len = le32_to_cpu(hdr->middle_len);
-       struct ceph_msg *msg = NULL;
-       int ret;
+       int ret = 0;
+
+       BUG_ON(con->in_msg != NULL);
 
        if (con->ops->alloc_msg) {
+               struct ceph_msg *msg;
+
                mutex_unlock(&con->mutex);
                msg = con->ops->alloc_msg(con, hdr, skip);
                mutex_lock(&con->mutex);
-               if (!msg || *skip)
-                       return NULL;
+               if (con->state != CON_STATE_OPEN) {
+                       ceph_msg_put(msg);
+                       return -EAGAIN;
+               }
+               con->in_msg = msg;
+               if (con->in_msg) {
+                       con->in_msg->con = con->ops->get(con);
+                       BUG_ON(con->in_msg->con == NULL);
+               }
+               if (*skip) {
+                       con->in_msg = NULL;
+                       return 0;
+               }
+               if (!con->in_msg) {
+                       con->error_msg =
+                               "error allocating memory for incoming message";
+                       return -ENOMEM;
+               }
        }
-       if (!msg) {
-               *skip = 0;
-               msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
-               if (!msg) {
+       if (!con->in_msg) {
+               con->in_msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
+               if (!con->in_msg) {
                        pr_err("unable to allocate msg type %d len %d\n",
                               type, front_len);
-                       return NULL;
+                       return -ENOMEM;
                }
-               msg->page_alignment = le16_to_cpu(hdr->data_off);
+               con->in_msg->con = con->ops->get(con);
+               BUG_ON(con->in_msg->con == NULL);
+               con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);
        }
-       memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
+       memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
 
-       if (middle_len && !msg->middle) {
-               ret = ceph_alloc_middle(con, msg);
+       if (middle_len && !con->in_msg->middle) {
+               ret = ceph_alloc_middle(con, con->in_msg);
                if (ret < 0) {
-                       ceph_msg_put(msg);
-                       return NULL;
+                       ceph_msg_put(con->in_msg);
+                       con->in_msg = NULL;
                }
        }
 
-       return msg;
+       return ret;
 }