Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
authorLinus Torvalds <torvalds@linux-foundation.org>
Wed, 28 Mar 2012 17:01:29 +0000 (10:01 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Wed, 28 Mar 2012 17:01:29 +0000 (10:01 -0700)
Pull Ceph updates for 3.4-rc1 from Sage Weil:
 "Alex has been busy.  There are a range of rbd and libceph cleanups,
  especially surrounding device setup and teardown, and a few critical
  fixes in that code.  There are more cleanups in the messenger code,
  virtual xattrs, a fix for CRC calculation/checks, and lots of other
  miscellaneous stuff.

  There's a patch from Amon Ott to make inos behave a bit better on
  32-bit boxes, some decode check fixes from Xi Wang, and network
  throttling fix from Jim Schutt, and a couple RBD fixes from Josh
  Durgin.

  No new functionality, just a lot of cleanup and bug fixing."

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (65 commits)
  rbd: move snap_rwsem to the device, rename to header_rwsem
  ceph: fix three bugs, two in ceph_vxattrcb_file_layout()
  libceph: isolate kmap() call in write_partial_msg_pages()
  libceph: rename "page_shift" variable to something sensible
  libceph: get rid of zero_page_address
  libceph: only call kernel_sendpage() via helper
  libceph: use kernel_sendpage() for sending zeroes
  libceph: fix inverted crc option logic
  libceph: some simple changes
  libceph: small refactor in write_partial_kvec()
  libceph: do crc calculations outside loop
  libceph: separate CRC calculation from byte swapping
  libceph: use "do" in CRC-related Boolean variables
  ceph: ensure Boolean options support both senses
  libceph: a few small changes
  libceph: make ceph_tcp_connect() return int
  libceph: encapsulate some messenger cleanup code
  libceph: make ceph_msgr_wq private
  libceph: encapsulate connection kvec operations
  libceph: move prepare_write_banner()
  ...

13 files changed:
drivers/block/rbd.c
drivers/block/rbd_types.h
fs/ceph/inode.c
fs/ceph/mds_client.c
fs/ceph/snap.c
fs/ceph/super.c
fs/ceph/super.h
fs/ceph/xattr.c
include/linux/ceph/libceph.h
include/linux/ceph/messenger.h
net/ceph/ceph_common.c
net/ceph/messenger.c
net/ceph/osdmap.c

index a6278e7e61a00bfde01bcb6726524ffb3a06514e..013c7a549fb6dbc3d5d1afe2e01730e951846507 100644 (file)
 
 #include "rbd_types.h"
 
-#define DRV_NAME "rbd"
-#define DRV_NAME_LONG "rbd (rados block device)"
+/*
+ * The basic unit of block I/O is a sector.  It is interpreted in a
+ * number of contexts in Linux (blk, bio, genhd), but the default is
+ * universally 512 bytes.  These symbols are just slightly more
+ * meaningful than the bare numbers they represent.
+ */
+#define        SECTOR_SHIFT    9
+#define        SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
+
+#define RBD_DRV_NAME "rbd"
+#define RBD_DRV_NAME_LONG "rbd (rados block device)"
 
 #define RBD_MINORS_PER_MAJOR   256             /* max minors per blkdev */
 
-#define RBD_MAX_MD_NAME_LEN    (96 + sizeof(RBD_SUFFIX))
+#define RBD_MAX_MD_NAME_LEN    (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
 #define RBD_MAX_POOL_NAME_LEN  64
 #define RBD_MAX_SNAP_NAME_LEN  32
 #define RBD_MAX_OPT_LEN                1024
 
 #define RBD_SNAP_HEAD_NAME     "-"
 
+/*
+ * An RBD device name will be "rbd#", where the "rbd" comes from
+ * RBD_DRV_NAME above, and # is a unique integer identifier.
+ * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
+ * enough to hold all possible device names.
+ */
 #define DEV_NAME_LEN           32
+#define MAX_INT_FORMAT_WIDTH   ((5 * sizeof (int)) / 2 + 1)
 
 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
 
@@ -66,7 +82,6 @@ struct rbd_image_header {
        __u8 obj_order;
        __u8 crypt_type;
        __u8 comp_type;
-       struct rw_semaphore snap_rwsem;
        struct ceph_snap_context *snapc;
        size_t snap_names_len;
        u64 snap_seq;
@@ -83,7 +98,7 @@ struct rbd_options {
 };
 
 /*
- * an instance of the client.  multiple devices may share a client.
+ * an instance of the client.  multiple devices may share an rbd client.
  */
 struct rbd_client {
        struct ceph_client      *client;
@@ -92,20 +107,9 @@ struct rbd_client {
        struct list_head        node;
 };
 
-struct rbd_req_coll;
-
 /*
- * a single io request
+ * a request completion status
  */
-struct rbd_request {
-       struct request          *rq;            /* blk layer request */
-       struct bio              *bio;           /* cloned bio */
-       struct page             **pages;        /* list of used pages */
-       u64                     len;
-       int                     coll_index;
-       struct rbd_req_coll     *coll;
-};
-
 struct rbd_req_status {
        int done;
        int rc;
@@ -122,6 +126,18 @@ struct rbd_req_coll {
        struct rbd_req_status   status[0];
 };
 
+/*
+ * a single io request
+ */
+struct rbd_request {
+       struct request          *rq;            /* blk layer request */
+       struct bio              *bio;           /* cloned bio */
+       struct page             **pages;        /* list of used pages */
+       u64                     len;
+       int                     coll_index;
+       struct rbd_req_coll     *coll;
+};
+
 struct rbd_snap {
        struct  device          dev;
        const char              *name;
@@ -140,7 +156,6 @@ struct rbd_device {
        struct gendisk          *disk;          /* blkdev's gendisk and rq */
        struct request_queue    *q;
 
-       struct ceph_client      *client;
        struct rbd_client       *rbd_client;
 
        char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
@@ -157,6 +172,8 @@ struct rbd_device {
        struct ceph_osd_event   *watch_event;
        struct ceph_osd_request *watch_request;
 
+       /* protects updating the header */
+       struct rw_semaphore     header_rwsem;
        char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
        u32 cur_snap;   /* index+1 of current snapshot within snap context
                           0 - for the head */
@@ -171,15 +188,13 @@ struct rbd_device {
        struct device           dev;
 };
 
-static struct bus_type rbd_bus_type = {
-       .name           = "rbd",
-};
-
-static spinlock_t node_lock;      /* protects client get/put */
-
 static DEFINE_MUTEX(ctl_mutex);          /* Serialize open/close/setup/teardown */
+
 static LIST_HEAD(rbd_dev_list);    /* devices */
-static LIST_HEAD(rbd_client_list);      /* clients */
+static DEFINE_SPINLOCK(rbd_dev_list_lock);
+
+static LIST_HEAD(rbd_client_list);             /* clients */
+static DEFINE_SPINLOCK(rbd_client_list_lock);
 
 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
 static void rbd_dev_release(struct device *dev);
@@ -190,12 +205,32 @@ static ssize_t rbd_snap_add(struct device *dev,
 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
                                  struct rbd_snap *snap);
 
+static ssize_t rbd_add(struct bus_type *bus, const char *buf,
+                      size_t count);
+static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
+                         size_t count);
 
-static struct rbd_device *dev_to_rbd(struct device *dev)
+static struct bus_attribute rbd_bus_attrs[] = {
+       __ATTR(add, S_IWUSR, NULL, rbd_add),
+       __ATTR(remove, S_IWUSR, NULL, rbd_remove),
+       __ATTR_NULL
+};
+
+static struct bus_type rbd_bus_type = {
+       .name           = "rbd",
+       .bus_attrs      = rbd_bus_attrs,
+};
+
+static void rbd_root_dev_release(struct device *dev)
 {
-       return container_of(dev, struct rbd_device, dev);
 }
 
+static struct device rbd_root_dev = {
+       .init_name =    "rbd",
+       .release =      rbd_root_dev_release,
+};
+
+
 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
 {
        return get_device(&rbd_dev->dev);
@@ -210,8 +245,7 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev);
 
 static int rbd_open(struct block_device *bdev, fmode_t mode)
 {
-       struct gendisk *disk = bdev->bd_disk;
-       struct rbd_device *rbd_dev = disk->private_data;
+       struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
 
        rbd_get_dev(rbd_dev);
 
@@ -256,9 +290,11 @@ static struct rbd_client *rbd_client_create(struct ceph_options *opt,
        kref_init(&rbdc->kref);
        INIT_LIST_HEAD(&rbdc->node);
 
+       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
        rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
        if (IS_ERR(rbdc->client))
-               goto out_rbdc;
+               goto out_mutex;
        opt = NULL; /* Now rbdc->client is responsible for opt */
 
        ret = ceph_open_session(rbdc->client);
@@ -267,16 +303,19 @@ static struct rbd_client *rbd_client_create(struct ceph_options *opt,
 
        rbdc->rbd_opts = rbd_opts;
 
-       spin_lock(&node_lock);
+       spin_lock(&rbd_client_list_lock);
        list_add_tail(&rbdc->node, &rbd_client_list);
-       spin_unlock(&node_lock);
+       spin_unlock(&rbd_client_list_lock);
+
+       mutex_unlock(&ctl_mutex);
 
        dout("rbd_client_create created %p\n", rbdc);
        return rbdc;
 
 out_err:
        ceph_destroy_client(rbdc->client);
-out_rbdc:
+out_mutex:
+       mutex_unlock(&ctl_mutex);
        kfree(rbdc);
 out_opt:
        if (opt)
@@ -324,7 +363,7 @@ static int parse_rbd_opts_token(char *c, void *private)
        substring_t argstr[MAX_OPT_ARGS];
        int token, intval, ret;
 
-       token = match_token((char *)c, rbdopt_tokens, argstr);
+       token = match_token(c, rbdopt_tokens, argstr);
        if (token < 0)
                return -EINVAL;
 
@@ -357,58 +396,54 @@ static int parse_rbd_opts_token(char *c, void *private)
  * Get a ceph client with specific addr and configuration, if one does
  * not exist create it.
  */
-static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
-                         char *options)
+static struct rbd_client *rbd_get_client(const char *mon_addr,
+                                        size_t mon_addr_len,
+                                        char *options)
 {
        struct rbd_client *rbdc;
        struct ceph_options *opt;
-       int ret;
        struct rbd_options *rbd_opts;
 
        rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
        if (!rbd_opts)
-               return -ENOMEM;
+               return ERR_PTR(-ENOMEM);
 
        rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
 
-       ret = ceph_parse_options(&opt, options, mon_addr,
-                                mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
-       if (ret < 0)
-               goto done_err;
+       opt = ceph_parse_options(options, mon_addr,
+                               mon_addr + mon_addr_len,
+                               parse_rbd_opts_token, rbd_opts);
+       if (IS_ERR(opt)) {
+               kfree(rbd_opts);
+               return ERR_CAST(opt);
+       }
 
-       spin_lock(&node_lock);
+       spin_lock(&rbd_client_list_lock);
        rbdc = __rbd_client_find(opt);
        if (rbdc) {
+               /* using an existing client */
+               kref_get(&rbdc->kref);
+               spin_unlock(&rbd_client_list_lock);
+
                ceph_destroy_options(opt);
                kfree(rbd_opts);
 
-               /* using an existing client */
-               kref_get(&rbdc->kref);
-               rbd_dev->rbd_client = rbdc;
-               rbd_dev->client = rbdc->client;
-               spin_unlock(&node_lock);
-               return 0;
+               return rbdc;
        }
-       spin_unlock(&node_lock);
+       spin_unlock(&rbd_client_list_lock);
 
        rbdc = rbd_client_create(opt, rbd_opts);
-       if (IS_ERR(rbdc)) {
-               ret = PTR_ERR(rbdc);
-               goto done_err;
-       }
 
-       rbd_dev->rbd_client = rbdc;
-       rbd_dev->client = rbdc->client;
-       return 0;
-done_err:
-       kfree(rbd_opts);
-       return ret;
+       if (IS_ERR(rbdc))
+               kfree(rbd_opts);
+
+       return rbdc;
 }
 
 /*
  * Destroy ceph client
  *
- * Caller must hold node_lock.
+ * Caller must hold rbd_client_list_lock.
  */
 static void rbd_client_release(struct kref *kref)
 {
@@ -428,11 +463,10 @@ static void rbd_client_release(struct kref *kref)
  */
 static void rbd_put_client(struct rbd_device *rbd_dev)
 {
-       spin_lock(&node_lock);
+       spin_lock(&rbd_client_list_lock);
        kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
-       spin_unlock(&node_lock);
+       spin_unlock(&rbd_client_list_lock);
        rbd_dev->rbd_client = NULL;
-       rbd_dev->client = NULL;
 }
 
 /*
@@ -457,21 +491,19 @@ static int rbd_header_from_disk(struct rbd_image_header *header,
                                 gfp_t gfp_flags)
 {
        int i;
-       u32 snap_count = le32_to_cpu(ondisk->snap_count);
-       int ret = -ENOMEM;
+       u32 snap_count;
 
-       if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT))) {
+       if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
                return -ENXIO;
-       }
 
-       init_rwsem(&header->snap_rwsem);
-       header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
+       snap_count = le32_to_cpu(ondisk->snap_count);
        header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
-                               snap_count *
-                                sizeof(struct rbd_image_snap_ondisk),
+                               snap_count * sizeof (*ondisk),
                                gfp_flags);
        if (!header->snapc)
                return -ENOMEM;
+
+       header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
        if (snap_count) {
                header->snap_names = kmalloc(header->snap_names_len,
                                             GFP_KERNEL);
@@ -498,8 +530,7 @@ static int rbd_header_from_disk(struct rbd_image_header *header,
        header->snapc->num_snaps = snap_count;
        header->total_snaps = snap_count;
 
-       if (snap_count &&
-           allocated_snaps == snap_count) {
+       if (snap_count && allocated_snaps == snap_count) {
                for (i = 0; i < snap_count; i++) {
                        header->snapc->snaps[i] =
                                le64_to_cpu(ondisk->snaps[i].id);
@@ -518,7 +549,7 @@ err_names:
        kfree(header->snap_names);
 err_snapc:
        kfree(header->snapc);
-       return ret;
+       return -ENOMEM;
 }
 
 static int snap_index(struct rbd_image_header *header, int snap_num)
@@ -542,35 +573,34 @@ static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
        int i;
        char *p = header->snap_names;
 
-       for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
-               if (strcmp(snap_name, p) == 0)
-                       break;
-       }
-       if (i == header->total_snaps)
-               return -ENOENT;
-       if (seq)
-               *seq = header->snapc->snaps[i];
+       for (i = 0; i < header->total_snaps; i++) {
+               if (!strcmp(snap_name, p)) {
 
-       if (size)
-               *size = header->snap_sizes[i];
+                       /* Found it.  Pass back its id and/or size */
 
-       return i;
+                       if (seq)
+                               *seq = header->snapc->snaps[i];
+                       if (size)
+                               *size = header->snap_sizes[i];
+                       return i;
+               }
+               p += strlen(p) + 1;     /* Skip ahead to the next name */
+       }
+       return -ENOENT;
 }
 
-static int rbd_header_set_snap(struct rbd_device *dev,
-                              const char *snap_name,
-                              u64 *size)
+static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
 {
        struct rbd_image_header *header = &dev->header;
        struct ceph_snap_context *snapc = header->snapc;
        int ret = -ENOENT;
 
-       down_write(&header->snap_rwsem);
+       BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
 
-       if (!snap_name ||
-           !*snap_name ||
-           strcmp(snap_name, "-") == 0 ||
-           strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
+       down_write(&dev->header_rwsem);
+
+       if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
+                   sizeof (RBD_SNAP_HEAD_NAME))) {
                if (header->total_snaps)
                        snapc->seq = header->snap_seq;
                else
@@ -580,7 +610,7 @@ static int rbd_header_set_snap(struct rbd_device *dev,
                if (size)
                        *size = header->image_size;
        } else {
-               ret = snap_by_name(header, snap_name, &snapc->seq, size);
+               ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
                if (ret < 0)
                        goto done;
 
@@ -590,7 +620,7 @@ static int rbd_header_set_snap(struct rbd_device *dev,
 
        ret = 0;
 done:
-       up_write(&header->snap_rwsem);
+       up_write(&dev->header_rwsem);
        return ret;
 }
 
@@ -717,7 +747,7 @@ static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
 
                        /* split the bio. We'll release it either in the next
                           call, or it will have to be released outside */
-                       bp = bio_split(old_chain, (len - total) / 512ULL);
+                       bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
                        if (!bp)
                                goto err_out;
 
@@ -857,7 +887,7 @@ static int rbd_do_request(struct request *rq,
        struct timespec mtime = CURRENT_TIME;
        struct rbd_request *req_data;
        struct ceph_osd_request_head *reqhead;
-       struct rbd_image_header *header = &dev->header;
+       struct ceph_osd_client *osdc;
 
        req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
        if (!req_data) {
@@ -874,15 +904,13 @@ static int rbd_do_request(struct request *rq,
 
        dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
 
-       down_read(&header->snap_rwsem);
+       down_read(&dev->header_rwsem);
 
-       req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
-                                     snapc,
-                                     ops,
-                                     false,
-                                     GFP_NOIO, pages, bio);
+       osdc = &dev->rbd_client->client->osdc;
+       req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
+                                       false, GFP_NOIO, pages, bio);
        if (!req) {
-               up_read(&header->snap_rwsem);
+               up_read(&dev->header_rwsem);
                ret = -ENOMEM;
                goto done_pages;
        }
@@ -909,27 +937,27 @@ static int rbd_do_request(struct request *rq,
        layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
        layout->fl_pg_preferred = cpu_to_le32(-1);
        layout->fl_pg_pool = cpu_to_le32(dev->poolid);
-       ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
-                            ofs, &len, &bno, req, ops);
+       ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
+                               req, ops);
 
        ceph_osdc_build_request(req, ofs, &len,
                                ops,
                                snapc,
                                &mtime,
                                req->r_oid, req->r_oid_len);
-       up_read(&header->snap_rwsem);
+       up_read(&dev->header_rwsem);
 
        if (linger_req) {
-               ceph_osdc_set_request_linger(&dev->client->osdc, req);
+               ceph_osdc_set_request_linger(osdc, req);
                *linger_req = req;
        }
 
-       ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
+       ret = ceph_osdc_start_request(osdc, req, false);
        if (ret < 0)
                goto done_err;
 
        if (!rbd_cb) {
-               ret = ceph_osdc_wait_request(&dev->client->osdc, req);
+               ret = ceph_osdc_wait_request(osdc, req);
                if (ver)
                        *ver = le64_to_cpu(req->r_reassert_version.version);
                dout("reassert_ver=%lld\n",
@@ -1213,8 +1241,8 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
        rc = __rbd_update_snaps(dev);
        mutex_unlock(&ctl_mutex);
        if (rc)
-               pr_warning(DRV_NAME "%d got notification but failed to update"
-                          " snaps: %d\n", dev->major, rc);
+               pr_warning(RBD_DRV_NAME "%d got notification but failed to "
+                          " update snaps: %d\n", dev->major, rc);
 
        rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
 }
@@ -1227,7 +1255,7 @@ static int rbd_req_sync_watch(struct rbd_device *dev,
                              u64 ver)
 {
        struct ceph_osd_req_op *ops;
-       struct ceph_osd_client *osdc = &dev->client->osdc;
+       struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
 
        int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
        if (ret < 0)
@@ -1314,7 +1342,7 @@ static int rbd_req_sync_notify(struct rbd_device *dev,
                          const char *obj)
 {
        struct ceph_osd_req_op *ops;
-       struct ceph_osd_client *osdc = &dev->client->osdc;
+       struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
        struct ceph_osd_event *event;
        struct rbd_notify_info info;
        int payload_len = sizeof(u32) + sizeof(u32);
@@ -1421,9 +1449,7 @@ static void rbd_rq_fn(struct request_queue *q)
        struct request *rq;
        struct bio_pair *bp = NULL;
 
-       rq = blk_fetch_request(q);
-
-       while (1) {
+       while ((rq = blk_fetch_request(q))) {
                struct bio *bio;
                struct bio *rq_bio, *next_bio = NULL;
                bool do_write;
@@ -1441,32 +1467,32 @@ static void rbd_rq_fn(struct request_queue *q)
                /* filter out block requests we don't understand */
                if ((rq->cmd_type != REQ_TYPE_FS)) {
                        __blk_end_request_all(rq, 0);
-                       goto next;
+                       continue;
                }
 
                /* deduce our operation (read, write) */
                do_write = (rq_data_dir(rq) == WRITE);
 
                size = blk_rq_bytes(rq);
-               ofs = blk_rq_pos(rq) * 512ULL;
+               ofs = blk_rq_pos(rq) * SECTOR_SIZE;
                rq_bio = rq->bio;
                if (do_write && rbd_dev->read_only) {
                        __blk_end_request_all(rq, -EROFS);
-                       goto next;
+                       continue;
                }
 
                spin_unlock_irq(q->queue_lock);
 
                dout("%s 0x%x bytes at 0x%llx\n",
                     do_write ? "write" : "read",
-                    size, blk_rq_pos(rq) * 512ULL);
+                    size, blk_rq_pos(rq) * SECTOR_SIZE);
 
                num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
                coll = rbd_alloc_coll(num_segs);
                if (!coll) {
                        spin_lock_irq(q->queue_lock);
                        __blk_end_request_all(rq, -ENOMEM);
-                       goto next;
+                       continue;
                }
 
                do {
@@ -1512,8 +1538,6 @@ next_seg:
                if (bp)
                        bio_pair_release(bp);
                spin_lock_irq(q->queue_lock);
-next:
-               rq = blk_fetch_request(q);
        }
 }
 
@@ -1526,13 +1550,17 @@ static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
                          struct bio_vec *bvec)
 {
        struct rbd_device *rbd_dev = q->queuedata;
-       unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
-       sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
-       unsigned int bio_sectors = bmd->bi_size >> 9;
+       unsigned int chunk_sectors;
+       sector_t sector;
+       unsigned int bio_sectors;
        int max;
 
+       chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
+       sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
+       bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
+
        max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
-                                + bio_sectors)) << 9;
+                                + bio_sectors)) << SECTOR_SHIFT;
        if (max < 0)
                max = 0; /* bio_add cannot handle a negative return */
        if (max <= bvec->bv_len && bio_sectors == 0)
@@ -1565,15 +1593,16 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
        ssize_t rc;
        struct rbd_image_header_ondisk *dh;
        int snap_count = 0;
-       u64 snap_names_len = 0;
        u64 ver;
+       size_t len;
 
+       /*
+        * First reads the fixed-size header to determine the number
+        * of snapshots, then re-reads it, along with all snapshot
+        * records as well as their stored names.
+        */
+       len = sizeof (*dh);
        while (1) {
-               int len = sizeof(*dh) +
-                         snap_count * sizeof(struct rbd_image_snap_ondisk) +
-                         snap_names_len;
-
-               rc = -ENOMEM;
                dh = kmalloc(len, GFP_KERNEL);
                if (!dh)
                        return -ENOMEM;
@@ -1588,21 +1617,22 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
 
                rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
                if (rc < 0) {
-                       if (rc == -ENXIO) {
+                       if (rc == -ENXIO)
                                pr_warning("unrecognized header format"
                                           " for image %s", rbd_dev->obj);
-                       }
                        goto out_dh;
                }
 
-               if (snap_count != header->total_snaps) {
-                       snap_count = header->total_snaps;
-                       snap_names_len = header->snap_names_len;
-                       rbd_header_free(header);
-                       kfree(dh);
-                       continue;
-               }
-               break;
+               if (snap_count == header->total_snaps)
+                       break;
+
+               snap_count = header->total_snaps;
+               len = sizeof (*dh) +
+                       snap_count * sizeof(struct rbd_image_snap_ondisk) +
+                       header->snap_names_len;
+
+               rbd_header_free(header);
+               kfree(dh);
        }
        header->obj_version = ver;
 
@@ -1623,13 +1653,14 @@ static int rbd_header_add_snap(struct rbd_device *dev,
        int ret;
        void *data, *p, *e;
        u64 ver;
+       struct ceph_mon_client *monc;
 
        /* we should create a snapshot only if we're pointing at the head */
        if (dev->cur_snap)
                return -EINVAL;
 
-       ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
-                                     &new_snapid);
+       monc = &dev->rbd_client->client->monc;
+       ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid);
        dout("created snapid=%lld\n", new_snapid);
        if (ret < 0)
                return ret;
@@ -1684,9 +1715,9 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
                return ret;
 
        /* resized? */
-       set_capacity(rbd_dev->disk, h.image_size / 512ULL);
+       set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE);
 
-       down_write(&rbd_dev->header.snap_rwsem);
+       down_write(&rbd_dev->header_rwsem);
 
        snap_seq = rbd_dev->header.snapc->seq;
        if (rbd_dev->header.total_snaps &&
@@ -1711,7 +1742,7 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
 
        ret = __rbd_init_snaps_header(rbd_dev);
 
-       up_write(&rbd_dev->header.snap_rwsem);
+       up_write(&rbd_dev->header_rwsem);
 
        return ret;
 }
@@ -1721,6 +1752,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
        struct gendisk *disk;
        struct request_queue *q;
        int rc;
+       u64 segment_size;
        u64 total_size = 0;
 
        /* contact OSD, request size info about the object being mapped */
@@ -1733,7 +1765,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
        if (rc)
                return rc;
 
-       rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
+       rc = rbd_header_set_snap(rbd_dev, &total_size);
        if (rc)
                return rc;
 
@@ -1743,7 +1775,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
        if (!disk)
                goto out;
 
-       snprintf(disk->disk_name, sizeof(disk->disk_name), DRV_NAME "%d",
+       snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
                 rbd_dev->id);
        disk->major = rbd_dev->major;
        disk->first_minor = 0;
@@ -1756,11 +1788,15 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
        if (!q)
                goto out_disk;
 
+       /* We use the default size, but let's be explicit about it. */
+       blk_queue_physical_block_size(q, SECTOR_SIZE);
+
        /* set io sizes to object size */
-       blk_queue_max_hw_sectors(q, rbd_obj_bytes(&rbd_dev->header) / 512ULL);
-       blk_queue_max_segment_size(q, rbd_obj_bytes(&rbd_dev->header));
-       blk_queue_io_min(q, rbd_obj_bytes(&rbd_dev->header));
-       blk_queue_io_opt(q, rbd_obj_bytes(&rbd_dev->header));
+       segment_size = rbd_obj_bytes(&rbd_dev->header);
+       blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
+       blk_queue_max_segment_size(q, segment_size);
+       blk_queue_io_min(q, segment_size);
+       blk_queue_io_opt(q, segment_size);
 
        blk_queue_merge_bvec(q, rbd_merge_bvec);
        disk->queue = q;
@@ -1771,7 +1807,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
        rbd_dev->q = q;
 
        /* finally, announce the disk to the world */
-       set_capacity(disk, total_size / 512ULL);
+       set_capacity(disk, total_size / SECTOR_SIZE);
        add_disk(disk);
 
        pr_info("%s: added with size 0x%llx\n",
@@ -1788,10 +1824,15 @@ out:
   sysfs
 */
 
+static struct rbd_device *dev_to_rbd_dev(struct device *dev)
+{
+       return container_of(dev, struct rbd_device, dev);
+}
+
 static ssize_t rbd_size_show(struct device *dev,
                             struct device_attribute *attr, char *buf)
 {
-       struct rbd_device *rbd_dev = dev_to_rbd(dev);
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
        return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
 }
@@ -1799,7 +1840,7 @@ static ssize_t rbd_size_show(struct device *dev,
 static ssize_t rbd_major_show(struct device *dev,
                              struct device_attribute *attr, char *buf)
 {
-       struct rbd_device *rbd_dev = dev_to_rbd(dev);
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
        return sprintf(buf, "%d\n", rbd_dev->major);
 }
@@ -1807,15 +1848,16 @@ static ssize_t rbd_major_show(struct device *dev,
 static ssize_t rbd_client_id_show(struct device *dev,
                                  struct device_attribute *attr, char *buf)
 {
-       struct rbd_device *rbd_dev = dev_to_rbd(dev);
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-       return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
+       return sprintf(buf, "client%lld\n",
+                       ceph_client_id(rbd_dev->rbd_client->client));
 }
 
 static ssize_t rbd_pool_show(struct device *dev,
                             struct device_attribute *attr, char *buf)
 {
-       struct rbd_device *rbd_dev = dev_to_rbd(dev);
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
        return sprintf(buf, "%s\n", rbd_dev->pool_name);
 }
@@ -1823,7 +1865,7 @@ static ssize_t rbd_pool_show(struct device *dev,
 static ssize_t rbd_name_show(struct device *dev,
                             struct device_attribute *attr, char *buf)
 {
-       struct rbd_device *rbd_dev = dev_to_rbd(dev);
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
        return sprintf(buf, "%s\n", rbd_dev->obj);
 }
@@ -1832,7 +1874,7 @@ static ssize_t rbd_snap_show(struct device *dev,
                             struct device_attribute *attr,
                             char *buf)
 {
-       struct rbd_device *rbd_dev = dev_to_rbd(dev);
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
        return sprintf(buf, "%s\n", rbd_dev->snap_name);
 }
@@ -1842,7 +1884,7 @@ static ssize_t rbd_image_refresh(struct device *dev,
                                 const char *buf,
                                 size_t size)
 {
-       struct rbd_device *rbd_dev = dev_to_rbd(dev);
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
        int rc;
        int ret = size;
 
@@ -1907,7 +1949,7 @@ static ssize_t rbd_snap_size_show(struct device *dev,
 {
        struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
 
-       return sprintf(buf, "%lld\n", (long long)snap->size);
+       return sprintf(buf, "%zd\n", snap->size);
 }
 
 static ssize_t rbd_snap_id_show(struct device *dev,
@@ -1916,7 +1958,7 @@ static ssize_t rbd_snap_id_show(struct device *dev,
 {
        struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
 
-       return sprintf(buf, "%lld\n", (long long)snap->id);
+       return sprintf(buf, "%llu\n", (unsigned long long) snap->id);
 }
 
 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
@@ -2088,19 +2130,9 @@ static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
        return 0;
 }
 
-
-static void rbd_root_dev_release(struct device *dev)
-{
-}
-
-static struct device rbd_root_dev = {
-       .init_name =    "rbd",
-       .release =      rbd_root_dev_release,
-};
-
 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
 {
-       int ret = -ENOMEM;
+       int ret;
        struct device *dev;
        struct rbd_snap *snap;
 
@@ -2114,7 +2146,7 @@ static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
        dev_set_name(dev, "%d", rbd_dev->id);
        ret = device_register(dev);
        if (ret < 0)
-               goto done_free;
+               goto out;
 
        list_for_each_entry(snap, &rbd_dev->snaps, node) {
                ret = rbd_register_snap_dev(rbd_dev, snap,
@@ -2122,10 +2154,7 @@ static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
                if (ret < 0)
                        break;
        }
-
-       mutex_unlock(&ctl_mutex);
-       return 0;
-done_free:
+out:
        mutex_unlock(&ctl_mutex);
        return ret;
 }
@@ -2154,104 +2183,250 @@ static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
        return ret;
 }
 
+static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
+
+/*
+ * Get a unique rbd identifier for the given new rbd_dev, and add
+ * the rbd_dev to the global list.  The minimum rbd id is 1.
+ */
+static void rbd_id_get(struct rbd_device *rbd_dev)
+{
+       rbd_dev->id = atomic64_inc_return(&rbd_id_max);
+
+       spin_lock(&rbd_dev_list_lock);
+       list_add_tail(&rbd_dev->node, &rbd_dev_list);
+       spin_unlock(&rbd_dev_list_lock);
+}
+
+/*
+ * Remove an rbd_dev from the global list, and record that its
+ * identifier is no longer in use.
+ */
+static void rbd_id_put(struct rbd_device *rbd_dev)
+{
+       struct list_head *tmp;
+       int rbd_id = rbd_dev->id;
+       int max_id;
+
+       BUG_ON(rbd_id < 1);
+
+       spin_lock(&rbd_dev_list_lock);
+       list_del_init(&rbd_dev->node);
+
+       /*
+        * If the id being "put" is not the current maximum, there
+        * is nothing special we need to do.
+        */
+       if (rbd_id != atomic64_read(&rbd_id_max)) {
+               spin_unlock(&rbd_dev_list_lock);
+               return;
+       }
+
+       /*
+        * We need to update the current maximum id.  Search the
+        * list to find out what it is.  We're more likely to find
+        * the maximum at the end, so search the list backward.
+        */
+       max_id = 0;
+       list_for_each_prev(tmp, &rbd_dev_list) {
+               struct rbd_device *rbd_dev;
+
+               rbd_dev = list_entry(tmp, struct rbd_device, node);
+               if (rbd_id > max_id)
+                       max_id = rbd_id;
+       }
+       spin_unlock(&rbd_dev_list_lock);
+
+       /*
+        * The max id could have been updated by rbd_id_get(), in
+        * which case it now accurately reflects the new maximum.
+        * Be careful not to overwrite the maximum value in that
+        * case.
+        */
+       atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
+}
+
+/*
+ * Skips over white space at *buf, and updates *buf to point to the
+ * first found non-space character (if any). Returns the length of
+ * the token (string of non-white space characters) found.  Note
+ * that *buf must be terminated with '\0'.
+ */
+static inline size_t next_token(const char **buf)
+{
+        /*
+        * These are the characters that produce nonzero for
+        * isspace() in the "C" and "POSIX" locales.
+        */
+        const char *spaces = " \f\n\r\t\v";
+
+        *buf += strspn(*buf, spaces);  /* Find start of token */
+
+       return strcspn(*buf, spaces);   /* Return token length */
+}
+
+/*
+ * Finds the next token in *buf, and if the provided token buffer is
+ * big enough, copies the found token into it.  The result, if
+ * copied, is guaranteed to be terminated with '\0'.  Note that *buf
+ * must be terminated with '\0' on entry.
+ *
+ * Returns the length of the token found (not including the '\0').
+ * Return value will be 0 if no token is found, and it will be >=
+ * token_size if the token would not fit.
+ *
+ * The *buf pointer will be updated to point beyond the end of the
+ * found token.  Note that this occurs even if the token buffer is
+ * too small to hold it.
+ */
+static inline size_t copy_token(const char **buf,
+                               char *token,
+                               size_t token_size)
+{
+        size_t len;
+
+       len = next_token(buf);
+       if (len < token_size) {
+               memcpy(token, *buf, len);
+               *(token + len) = '\0';
+       }
+       *buf += len;
+
+        return len;
+}
+
+/*
+ * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
+ * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
+ * on the list of monitor addresses and other options provided via
+ * /sys/bus/rbd/add.
+ */
+static int rbd_add_parse_args(struct rbd_device *rbd_dev,
+                             const char *buf,
+                             const char **mon_addrs,
+                             size_t *mon_addrs_size,
+                             char *options,
+                             size_t options_size)
+{
+       size_t  len;
+
+       /* The first four tokens are required */
+
+       len = next_token(&buf);
+       if (!len)
+               return -EINVAL;
+       *mon_addrs_size = len + 1;
+       *mon_addrs = buf;
+
+       buf += len;
+
+       len = copy_token(&buf, options, options_size);
+       if (!len || len >= options_size)
+               return -EINVAL;
+
+       len = copy_token(&buf, rbd_dev->pool_name, sizeof (rbd_dev->pool_name));
+       if (!len || len >= sizeof (rbd_dev->pool_name))
+               return -EINVAL;
+
+       len = copy_token(&buf, rbd_dev->obj, sizeof (rbd_dev->obj));
+       if (!len || len >= sizeof (rbd_dev->obj))
+               return -EINVAL;
+
+       /* We have the object length in hand, save it. */
+
+       rbd_dev->obj_len = len;
+
+       BUILD_BUG_ON(RBD_MAX_MD_NAME_LEN
+                               < RBD_MAX_OBJ_NAME_LEN + sizeof (RBD_SUFFIX));
+       sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
+
+       /*
+        * The snapshot name is optional, but it's an error if it's
+        * too long.  If no snapshot is supplied, fill in the default.
+        */
+       len = copy_token(&buf, rbd_dev->snap_name, sizeof (rbd_dev->snap_name));
+       if (!len)
+               memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
+                       sizeof (RBD_SNAP_HEAD_NAME));
+       else if (len >= sizeof (rbd_dev->snap_name))
+               return -EINVAL;
+
+       return 0;
+}
+
 static ssize_t rbd_add(struct bus_type *bus,
                       const char *buf,
                       size_t count)
 {
-       struct ceph_osd_client *osdc;
        struct rbd_device *rbd_dev;
-       ssize_t rc = -ENOMEM;
-       int irc, new_id = 0;
-       struct list_head *tmp;
-       char *mon_dev_name;
-       char *options;
+       const char *mon_addrs = NULL;
+       size_t mon_addrs_size = 0;
+       char *options = NULL;
+       struct ceph_osd_client *osdc;
+       int rc = -ENOMEM;
 
        if (!try_module_get(THIS_MODULE))
                return -ENODEV;
 
-       mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
-       if (!mon_dev_name)
-               goto err_out_mod;
-
-       options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
-       if (!options)
-               goto err_mon_dev;
-
-       /* new rbd_device object */
        rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
        if (!rbd_dev)
-               goto err_out_opt;
+               goto err_nomem;
+       options = kmalloc(count, GFP_KERNEL);
+       if (!options)
+               goto err_nomem;
 
        /* static rbd_device initialization */
        spin_lock_init(&rbd_dev->lock);
        INIT_LIST_HEAD(&rbd_dev->node);
        INIT_LIST_HEAD(&rbd_dev->snaps);
+       init_rwsem(&rbd_dev->header_rwsem);
 
-       init_rwsem(&rbd_dev->header.snap_rwsem);
+       init_rwsem(&rbd_dev->header_rwsem);
 
        /* generate unique id: find highest unique id, add one */
-       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-
-       list_for_each(tmp, &rbd_dev_list) {
-               struct rbd_device *rbd_dev;
+       rbd_id_get(rbd_dev);
 
-               rbd_dev = list_entry(tmp, struct rbd_device, node);
-               if (rbd_dev->id >= new_id)
-                       new_id = rbd_dev->id + 1;
-       }
-
-       rbd_dev->id = new_id;
-
-       /* add to global list */
-       list_add_tail(&rbd_dev->node, &rbd_dev_list);
+       /* Fill in the device name, now that we have its id. */
+       BUILD_BUG_ON(DEV_NAME_LEN
+                       < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
+       sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
 
        /* parse add command */
-       if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
-                  "%" __stringify(RBD_MAX_OPT_LEN) "s "
-                  "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
-                  "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
-                  "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
-                  mon_dev_name, options, rbd_dev->pool_name,
-                  rbd_dev->obj, rbd_dev->snap_name) < 4) {
-               rc = -EINVAL;
-               goto err_out_slot;
-       }
-
-       if (rbd_dev->snap_name[0] == 0)
-               rbd_dev->snap_name[0] = '-';
-
-       rbd_dev->obj_len = strlen(rbd_dev->obj);
-       snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
-                rbd_dev->obj, RBD_SUFFIX);
-
-       /* initialize rest of new object */
-       snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
-       rc = rbd_get_client(rbd_dev, mon_dev_name, options);
-       if (rc < 0)
-               goto err_out_slot;
+       rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
+                               options, count);
+       if (rc)
+               goto err_put_id;
 
-       mutex_unlock(&ctl_mutex);
+       rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
+                                               options);
+       if (IS_ERR(rbd_dev->rbd_client)) {
+               rc = PTR_ERR(rbd_dev->rbd_client);
+               goto err_put_id;
+       }
 
        /* pick the pool */
-       osdc = &rbd_dev->client->osdc;
+       osdc = &rbd_dev->rbd_client->client->osdc;
        rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
        if (rc < 0)
                goto err_out_client;
        rbd_dev->poolid = rc;
 
        /* register our block device */
-       irc = register_blkdev(0, rbd_dev->name);
-       if (irc < 0) {
-               rc = irc;
+       rc = register_blkdev(0, rbd_dev->name);
+       if (rc < 0)
                goto err_out_client;
-       }
-       rbd_dev->major = irc;
+       rbd_dev->major = rc;
 
        rc = rbd_bus_add_dev(rbd_dev);
        if (rc)
                goto err_out_blkdev;
 
-       /* set up and announce blkdev mapping */
+       /*
+        * At this point cleanup in the event of an error is the job
+        * of the sysfs code (initiated by rbd_bus_del_dev()).
+        *
+        * Set up and announce blkdev mapping.
+        */
        rc = rbd_init_disk(rbd_dev);
        if (rc)
                goto err_out_bus;
@@ -2263,35 +2438,26 @@ static ssize_t rbd_add(struct bus_type *bus,
        return count;
 
 err_out_bus:
-       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-       list_del_init(&rbd_dev->node);
-       mutex_unlock(&ctl_mutex);
-
        /* this will also clean up rest of rbd_dev stuff */
 
        rbd_bus_del_dev(rbd_dev);
        kfree(options);
-       kfree(mon_dev_name);
        return rc;
 
 err_out_blkdev:
        unregister_blkdev(rbd_dev->major, rbd_dev->name);
 err_out_client:
        rbd_put_client(rbd_dev);
-       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-err_out_slot:
-       list_del_init(&rbd_dev->node);
-       mutex_unlock(&ctl_mutex);
-
-       kfree(rbd_dev);
-err_out_opt:
+err_put_id:
+       rbd_id_put(rbd_dev);
+err_nomem:
        kfree(options);
-err_mon_dev:
-       kfree(mon_dev_name);
-err_out_mod:
+       kfree(rbd_dev);
+
        dout("Error adding device %s\n", buf);
        module_put(THIS_MODULE);
-       return rc;
+
+       return (ssize_t) rc;
 }
 
 static struct rbd_device *__rbd_get_dev(unsigned long id)
@@ -2299,22 +2465,28 @@ static struct rbd_device *__rbd_get_dev(unsigned long id)
        struct list_head *tmp;
        struct rbd_device *rbd_dev;
 
+       spin_lock(&rbd_dev_list_lock);
        list_for_each(tmp, &rbd_dev_list) {
                rbd_dev = list_entry(tmp, struct rbd_device, node);
-               if (rbd_dev->id == id)
+               if (rbd_dev->id == id) {
+                       spin_unlock(&rbd_dev_list_lock);
                        return rbd_dev;
+               }
        }
+       spin_unlock(&rbd_dev_list_lock);
        return NULL;
 }
 
 static void rbd_dev_release(struct device *dev)
 {
-       struct rbd_device *rbd_dev =
-                       container_of(dev, struct rbd_device, dev);
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-       if (rbd_dev->watch_request)
-               ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
+       if (rbd_dev->watch_request) {
+               struct ceph_client *client = rbd_dev->rbd_client->client;
+
+               ceph_osdc_unregister_linger_request(&client->osdc,
                                                    rbd_dev->watch_request);
+       }
        if (rbd_dev->watch_event)
                rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
 
@@ -2323,6 +2495,9 @@ static void rbd_dev_release(struct device *dev)
        /* clean up and free blkdev */
        rbd_free_disk(rbd_dev);
        unregister_blkdev(rbd_dev->major, rbd_dev->name);
+
+       /* done with the id, and with the rbd_dev */
+       rbd_id_put(rbd_dev);
        kfree(rbd_dev);
 
        /* release module ref */
@@ -2355,8 +2530,6 @@ static ssize_t rbd_remove(struct bus_type *bus,
                goto done;
        }
 
-       list_del_init(&rbd_dev->node);
-
        __rbd_remove_all_snaps(rbd_dev);
        rbd_bus_del_dev(rbd_dev);
 
@@ -2370,7 +2543,7 @@ static ssize_t rbd_snap_add(struct device *dev,
                            const char *buf,
                            size_t count)
 {
-       struct rbd_device *rbd_dev = dev_to_rbd(dev);
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
        int ret;
        char *name = kmalloc(count + 1, GFP_KERNEL);
        if (!name)
@@ -2406,12 +2579,6 @@ err_unlock:
        return ret;
 }
 
-static struct bus_attribute rbd_bus_attrs[] = {
-       __ATTR(add, S_IWUSR, NULL, rbd_add),
-       __ATTR(remove, S_IWUSR, NULL, rbd_remove),
-       __ATTR_NULL
-};
-
 /*
  * create control files in sysfs
  * /sys/bus/rbd/...
@@ -2420,21 +2587,21 @@ static int rbd_sysfs_init(void)
 {
        int ret;
 
-       rbd_bus_type.bus_attrs = rbd_bus_attrs;
-
-       ret = bus_register(&rbd_bus_type);
-        if (ret < 0)
+       ret = device_register(&rbd_root_dev);
+       if (ret < 0)
                return ret;
 
-       ret = device_register(&rbd_root_dev);
+       ret = bus_register(&rbd_bus_type);
+       if (ret < 0)
+               device_unregister(&rbd_root_dev);
 
        return ret;
 }
 
 static void rbd_sysfs_cleanup(void)
 {
-       device_unregister(&rbd_root_dev);
        bus_unregister(&rbd_bus_type);
+       device_unregister(&rbd_root_dev);
 }
 
 int __init rbd_init(void)
@@ -2444,8 +2611,7 @@ int __init rbd_init(void)
        rc = rbd_sysfs_init();
        if (rc)
                return rc;
-       spin_lock_init(&node_lock);
-       pr_info("loaded " DRV_NAME_LONG "\n");
+       pr_info("loaded " RBD_DRV_NAME_LONG "\n");
        return 0;
 }
 
index fc6c678aa2cb3971333f602eb9d71008d73041ac..950708688f1719109962e86b450186c845f89cf7 100644 (file)
 #define RBD_HEADER_SIGNATURE   "RBD"
 #define RBD_HEADER_VERSION     "001.005"
 
-struct rbd_info {
-       __le64 max_id;
-} __attribute__ ((packed));
-
 struct rbd_image_snap_ondisk {
        __le64 id;
        __le64 image_size;
index 2c489378b4cd6b470d06ecc9f3c60fc67ea6e1cf..9fff9f3b17e4a5206a9073cec2be714fba44bda0 100644 (file)
@@ -677,18 +677,19 @@ static int fill_inode(struct inode *inode,
        case S_IFLNK:
                inode->i_op = &ceph_symlink_iops;
                if (!ci->i_symlink) {
-                       int symlen = iinfo->symlink_len;
+                       u32 symlen = iinfo->symlink_len;
                        char *sym;
 
-                       BUG_ON(symlen != inode->i_size);
                        spin_unlock(&ci->i_ceph_lock);
 
+                       err = -EINVAL;
+                       if (WARN_ON(symlen != inode->i_size))
+                               goto out;
+
                        err = -ENOMEM;
-                       sym = kmalloc(symlen+1, GFP_NOFS);
+                       sym = kstrndup(iinfo->symlink, symlen, GFP_NOFS);
                        if (!sym)
                                goto out;
-                       memcpy(sym, iinfo->symlink, symlen);
-                       sym[symlen] = 0;
 
                        spin_lock(&ci->i_ceph_lock);
                        if (!ci->i_symlink)
index 866e8d7ca37d7343fe7c30cc5e036cb5d6b9494e..89971e137aab80454fed8a51a105d7df903b3101 100644 (file)
@@ -402,7 +402,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
 
        spin_lock_init(&s->s_gen_ttl_lock);
        s->s_cap_gen = 0;
-       s->s_cap_ttl = 0;
+       s->s_cap_ttl = jiffies - 1;
 
        spin_lock_init(&s->s_cap_lock);
        s->s_renew_requested = 0;
@@ -1083,8 +1083,7 @@ static void renewed_caps(struct ceph_mds_client *mdsc,
        int wake = 0;
 
        spin_lock(&session->s_cap_lock);
-       was_stale = is_renew && (session->s_cap_ttl == 0 ||
-                                time_after_eq(jiffies, session->s_cap_ttl));
+       was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
 
        session->s_cap_ttl = session->s_renew_requested +
                mdsc->mdsmap->m_session_timeout*HZ;
@@ -2332,7 +2331,7 @@ static void handle_session(struct ceph_mds_session *session,
                        session->s_mds);
                spin_lock(&session->s_gen_ttl_lock);
                session->s_cap_gen++;
-               session->s_cap_ttl = 0;
+               session->s_cap_ttl = jiffies - 1;
                spin_unlock(&session->s_gen_ttl_lock);
                send_renew_caps(mdsc, session);
                break;
index a559c80f127a04353a488181029744a009165f09..f04c0961f9937eb6f553978f10942800818528ba 100644 (file)
@@ -331,7 +331,7 @@ static int build_snap_context(struct ceph_snap_realm *realm)
 
        /* alloc new snap context */
        err = -ENOMEM;
-       if (num > ULONG_MAX / sizeof(u64) - sizeof(*snapc))
+       if (num > (ULONG_MAX - sizeof(*snapc)) / sizeof(u64))
                goto fail;
        snapc = kzalloc(sizeof(*snapc) + num*sizeof(u64), GFP_NOFS);
        if (!snapc)
index 256f852219261e66984384fe19bb7182bf8dd50a..1e67dd7305a4e8596a3c5f609ccea2a866e4e31a 100644 (file)
@@ -130,10 +130,12 @@ enum {
        Opt_nodirstat,
        Opt_rbytes,
        Opt_norbytes,
+       Opt_asyncreaddir,
        Opt_noasyncreaddir,
        Opt_dcache,
        Opt_nodcache,
        Opt_ino32,
+       Opt_noino32,
 };
 
 static match_table_t fsopt_tokens = {
@@ -153,10 +155,12 @@ static match_table_t fsopt_tokens = {
        {Opt_nodirstat, "nodirstat"},
        {Opt_rbytes, "rbytes"},
        {Opt_norbytes, "norbytes"},
+       {Opt_asyncreaddir, "asyncreaddir"},
        {Opt_noasyncreaddir, "noasyncreaddir"},
        {Opt_dcache, "dcache"},
        {Opt_nodcache, "nodcache"},
        {Opt_ino32, "ino32"},
+       {Opt_noino32, "noino32"},
        {-1, NULL}
 };
 
@@ -232,6 +236,9 @@ static int parse_fsopt_token(char *c, void *private)
        case Opt_norbytes:
                fsopt->flags &= ~CEPH_MOUNT_OPT_RBYTES;
                break;
+       case Opt_asyncreaddir:
+               fsopt->flags &= ~CEPH_MOUNT_OPT_NOASYNCREADDIR;
+               break;
        case Opt_noasyncreaddir:
                fsopt->flags |= CEPH_MOUNT_OPT_NOASYNCREADDIR;
                break;
@@ -244,6 +251,9 @@ static int parse_fsopt_token(char *c, void *private)
        case Opt_ino32:
                fsopt->flags |= CEPH_MOUNT_OPT_INO32;
                break;
+       case Opt_noino32:
+               fsopt->flags &= ~CEPH_MOUNT_OPT_INO32;
+               break;
        default:
                BUG_ON(token);
        }
@@ -334,10 +344,12 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
        *path += 2;
        dout("server path '%s'\n", *path);
 
-       err = ceph_parse_options(popt, options, dev_name, dev_name_end,
+       *popt = ceph_parse_options(options, dev_name, dev_name_end,
                                 parse_fsopt_token, (void *)fsopt);
-       if (err)
+       if (IS_ERR(*popt)) {
+               err = PTR_ERR(*popt);
                goto out;
+       }
 
        /* success */
        *pfsopt = fsopt;
@@ -926,6 +938,7 @@ static int __init init_ceph(void)
        if (ret)
                goto out;
 
+       ceph_xattr_init();
        ret = register_filesystem(&ceph_fs_type);
        if (ret)
                goto out_icache;
@@ -935,6 +948,7 @@ static int __init init_ceph(void)
        return 0;
 
 out_icache:
+       ceph_xattr_exit();
        destroy_caches();
 out:
        return ret;
@@ -944,6 +958,7 @@ static void __exit exit_ceph(void)
 {
        dout("exit_ceph\n");
        unregister_filesystem(&ceph_fs_type);
+       ceph_xattr_exit();
        destroy_caches();
 }
 
index 1421f3d875a22e34e1449bde4d46327678d114fe..fc35036d258d7b830c37f5cba078f53c53e7d574 100644 (file)
@@ -367,7 +367,7 @@ static inline u32 ceph_ino_to_ino32(__u64 vino)
        u32 ino = vino & 0xffffffff;
        ino ^= vino >> 32;
        if (!ino)
-               ino = 1;
+               ino = 2;
        return ino;
 }
 
@@ -733,6 +733,8 @@ extern ssize_t ceph_listxattr(struct dentry *, char *, size_t);
 extern int ceph_removexattr(struct dentry *, const char *);
 extern void __ceph_build_xattrs_blob(struct ceph_inode_info *ci);
 extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci);
+extern void __init ceph_xattr_init(void);
+extern void ceph_xattr_exit(void);
 
 /* caps.c */
 extern const char *ceph_cap_string(int c);
index a76f697303d9e5db700598fc817008494908bb99..35b86331d8a5ce84c311e9eb2730757f80149179 100644 (file)
@@ -8,9 +8,12 @@
 #include <linux/xattr.h>
 #include <linux/slab.h>
 
+#define XATTR_CEPH_PREFIX "ceph."
+#define XATTR_CEPH_PREFIX_LEN (sizeof (XATTR_CEPH_PREFIX) - 1)
+
 static bool ceph_is_valid_xattr(const char *name)
 {
-       return !strncmp(name, "ceph.", 5) ||
+       return !strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN) ||
               !strncmp(name, XATTR_SECURITY_PREFIX,
                        XATTR_SECURITY_PREFIX_LEN) ||
               !strncmp(name, XATTR_TRUSTED_PREFIX, XATTR_TRUSTED_PREFIX_LEN) ||
@@ -21,79 +24,91 @@ static bool ceph_is_valid_xattr(const char *name)
  * These define virtual xattrs exposing the recursive directory
  * statistics and layout metadata.
  */
-struct ceph_vxattr_cb {
-       bool readonly;
+struct ceph_vxattr {
        char *name;
+       size_t name_size;       /* strlen(name) + 1 (for '\0') */
        size_t (*getxattr_cb)(struct ceph_inode_info *ci, char *val,
                              size_t size);
+       bool readonly;
 };
 
 /* directories */
 
-static size_t ceph_vxattrcb_entries(struct ceph_inode_info *ci, char *val,
+static size_t ceph_vxattrcb_dir_entries(struct ceph_inode_info *ci, char *val,
                                        size_t size)
 {
        return snprintf(val, size, "%lld", ci->i_files + ci->i_subdirs);
 }
 
-static size_t ceph_vxattrcb_files(struct ceph_inode_info *ci, char *val,
+static size_t ceph_vxattrcb_dir_files(struct ceph_inode_info *ci, char *val,
                                      size_t size)
 {
        return snprintf(val, size, "%lld", ci->i_files);
 }
 
-static size_t ceph_vxattrcb_subdirs(struct ceph_inode_info *ci, char *val,
+static size_t ceph_vxattrcb_dir_subdirs(struct ceph_inode_info *ci, char *val,
                                        size_t size)
 {
        return snprintf(val, size, "%lld", ci->i_subdirs);
 }
 
-static size_t ceph_vxattrcb_rentries(struct ceph_inode_info *ci, char *val,
+static size_t ceph_vxattrcb_dir_rentries(struct ceph_inode_info *ci, char *val,
                                         size_t size)
 {
        return snprintf(val, size, "%lld", ci->i_rfiles + ci->i_rsubdirs);
 }
 
-static size_t ceph_vxattrcb_rfiles(struct ceph_inode_info *ci, char *val,
+static size_t ceph_vxattrcb_dir_rfiles(struct ceph_inode_info *ci, char *val,
                                       size_t size)
 {
        return snprintf(val, size, "%lld", ci->i_rfiles);
 }
 
-static size_t ceph_vxattrcb_rsubdirs(struct ceph_inode_info *ci, char *val,
+static size_t ceph_vxattrcb_dir_rsubdirs(struct ceph_inode_info *ci, char *val,
                                         size_t size)
 {
        return snprintf(val, size, "%lld", ci->i_rsubdirs);
 }
 
-static size_t ceph_vxattrcb_rbytes(struct ceph_inode_info *ci, char *val,
+static size_t ceph_vxattrcb_dir_rbytes(struct ceph_inode_info *ci, char *val,
                                       size_t size)
 {
        return snprintf(val, size, "%lld", ci->i_rbytes);
 }
 
-static size_t ceph_vxattrcb_rctime(struct ceph_inode_info *ci, char *val,
+static size_t ceph_vxattrcb_dir_rctime(struct ceph_inode_info *ci, char *val,
                                       size_t size)
 {
-       return snprintf(val, size, "%ld.%ld", (long)ci->i_rctime.tv_sec,
+       return snprintf(val, size, "%ld.09%ld", (long)ci->i_rctime.tv_sec,
                        (long)ci->i_rctime.tv_nsec);
 }
 
-static struct ceph_vxattr_cb ceph_dir_vxattrs[] = {
-       { true, "ceph.dir.entries", ceph_vxattrcb_entries},
-       { true, "ceph.dir.files", ceph_vxattrcb_files},
-       { true, "ceph.dir.subdirs", ceph_vxattrcb_subdirs},
-       { true, "ceph.dir.rentries", ceph_vxattrcb_rentries},
-       { true, "ceph.dir.rfiles", ceph_vxattrcb_rfiles},
-       { true, "ceph.dir.rsubdirs", ceph_vxattrcb_rsubdirs},
-       { true, "ceph.dir.rbytes", ceph_vxattrcb_rbytes},
-       { true, "ceph.dir.rctime", ceph_vxattrcb_rctime},
-       { true, NULL, NULL }
+#define CEPH_XATTR_NAME(_type, _name)  XATTR_CEPH_PREFIX #_type "." #_name
+
+#define XATTR_NAME_CEPH(_type, _name) \
+               { \
+                       .name = CEPH_XATTR_NAME(_type, _name), \
+                       .name_size = sizeof (CEPH_XATTR_NAME(_type, _name)), \
+                       .getxattr_cb = ceph_vxattrcb_ ## _type ## _ ## _name, \
+                       .readonly = true, \
+               }
+
+static struct ceph_vxattr ceph_dir_vxattrs[] = {
+       XATTR_NAME_CEPH(dir, entries),
+       XATTR_NAME_CEPH(dir, files),
+       XATTR_NAME_CEPH(dir, subdirs),
+       XATTR_NAME_CEPH(dir, rentries),
+       XATTR_NAME_CEPH(dir, rfiles),
+       XATTR_NAME_CEPH(dir, rsubdirs),
+       XATTR_NAME_CEPH(dir, rbytes),
+       XATTR_NAME_CEPH(dir, rctime),
+       { 0 }   /* Required table terminator */
 };
+static size_t ceph_dir_vxattrs_name_size;      /* total size of all names */
 
 /* files */
 
-static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
+static size_t ceph_vxattrcb_file_layout(struct ceph_inode_info *ci, char *val,
                                   size_t size)
 {
        int ret;
@@ -103,21 +118,32 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
                (unsigned long long)ceph_file_layout_su(ci->i_layout),
                (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout),
                (unsigned long long)ceph_file_layout_object_size(ci->i_layout));
-       if (ceph_file_layout_pg_preferred(ci->i_layout))
-               ret += snprintf(val + ret, size, "preferred_osd=%lld\n",
+
+       if (ceph_file_layout_pg_preferred(ci->i_layout) >= 0) {
+               val += ret;
+               size -= ret;
+               ret += snprintf(val, size, "preferred_osd=%lld\n",
                            (unsigned long long)ceph_file_layout_pg_preferred(
                                    ci->i_layout));
+       }
+
        return ret;
 }
 
-static struct ceph_vxattr_cb ceph_file_vxattrs[] = {
-       { true, "ceph.file.layout", ceph_vxattrcb_layout},
+static struct ceph_vxattr ceph_file_vxattrs[] = {
+       XATTR_NAME_CEPH(file, layout),
        /* The following extended attribute name is deprecated */
-       { true, "ceph.layout", ceph_vxattrcb_layout},
-       { true, NULL, NULL }
+       {
+               .name = XATTR_CEPH_PREFIX "layout",
+               .name_size = sizeof (XATTR_CEPH_PREFIX "layout"),
+               .getxattr_cb = ceph_vxattrcb_file_layout,
+               .readonly = true,
+       },
+       { 0 }   /* Required table terminator */
 };
+static size_t ceph_file_vxattrs_name_size;     /* total size of all names */
 
-static struct ceph_vxattr_cb *ceph_inode_vxattrs(struct inode *inode)
+static struct ceph_vxattr *ceph_inode_vxattrs(struct inode *inode)
 {
        if (S_ISDIR(inode->i_mode))
                return ceph_dir_vxattrs;
@@ -126,14 +152,59 @@ static struct ceph_vxattr_cb *ceph_inode_vxattrs(struct inode *inode)
        return NULL;
 }
 
-static struct ceph_vxattr_cb *ceph_match_vxattr(struct ceph_vxattr_cb *vxattr,
+static size_t ceph_vxattrs_name_size(struct ceph_vxattr *vxattrs)
+{
+       if (vxattrs == ceph_dir_vxattrs)
+               return ceph_dir_vxattrs_name_size;
+       if (vxattrs == ceph_file_vxattrs)
+               return ceph_file_vxattrs_name_size;
+       BUG();
+
+       return 0;
+}
+
+/*
+ * Compute the aggregate size (including terminating '\0') of all
+ * virtual extended attribute names in the given vxattr table.
+ */
+static size_t __init vxattrs_name_size(struct ceph_vxattr *vxattrs)
+{
+       struct ceph_vxattr *vxattr;
+       size_t size = 0;
+
+       for (vxattr = vxattrs; vxattr->name; vxattr++)
+               size += vxattr->name_size;
+
+       return size;
+}
+
+/* Routines called at initialization and exit time */
+
+void __init ceph_xattr_init(void)
+{
+       ceph_dir_vxattrs_name_size = vxattrs_name_size(ceph_dir_vxattrs);
+       ceph_file_vxattrs_name_size = vxattrs_name_size(ceph_file_vxattrs);
+}
+
+void ceph_xattr_exit(void)
+{
+       ceph_dir_vxattrs_name_size = 0;
+       ceph_file_vxattrs_name_size = 0;
+}
+
+static struct ceph_vxattr *ceph_match_vxattr(struct inode *inode,
                                                const char *name)
 {
-       do {
-               if (strcmp(vxattr->name, name) == 0)
-                       return vxattr;
-               vxattr++;
-       } while (vxattr->name);
+       struct ceph_vxattr *vxattr = ceph_inode_vxattrs(inode);
+
+       if (vxattr) {
+               while (vxattr->name) {
+                       if (!strcmp(vxattr->name, name))
+                               return vxattr;
+                       vxattr++;
+               }
+       }
+
        return NULL;
 }
 
@@ -502,17 +573,15 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value,
 {
        struct inode *inode = dentry->d_inode;
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_vxattr_cb *vxattrs = ceph_inode_vxattrs(inode);
        int err;
        struct ceph_inode_xattr *xattr;
-       struct ceph_vxattr_cb *vxattr = NULL;
+       struct ceph_vxattr *vxattr = NULL;
 
        if (!ceph_is_valid_xattr(name))
                return -ENODATA;
 
        /* let's see if a virtual xattr was requested */
-       if (vxattrs)
-               vxattr = ceph_match_vxattr(vxattrs, name);
+       vxattr = ceph_match_vxattr(inode, name);
 
        spin_lock(&ci->i_ceph_lock);
        dout("getxattr %p ver=%lld index_ver=%lld\n", inode,
@@ -568,7 +637,7 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
 {
        struct inode *inode = dentry->d_inode;
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_vxattr_cb *vxattrs = ceph_inode_vxattrs(inode);
+       struct ceph_vxattr *vxattrs = ceph_inode_vxattrs(inode);
        u32 vir_namelen = 0;
        u32 namelen;
        int err;
@@ -596,11 +665,12 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
                goto out;
 
 list_xattr:
-       vir_namelen = 0;
-       /* include virtual dir xattrs */
-       if (vxattrs)
-               for (i = 0; vxattrs[i].name; i++)
-                       vir_namelen += strlen(vxattrs[i].name) + 1;
+       /*
+        * Start with virtual dir xattr names (if any) (including
+        * terminating '\0' characters for each).
+        */
+       vir_namelen = ceph_vxattrs_name_size(vxattrs);
+
        /* adding 1 byte per each variable due to the null termination */
        namelen = vir_namelen + ci->i_xattrs.names_size + ci->i_xattrs.count;
        err = -ERANGE;
@@ -698,17 +768,17 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
                  const void *value, size_t size, int flags)
 {
        struct inode *inode = dentry->d_inode;
+       struct ceph_vxattr *vxattr;
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_vxattr_cb *vxattrs = ceph_inode_vxattrs(inode);
+       int issued;
        int err;
+       int dirty;
        int name_len = strlen(name);
        int val_len = size;
        char *newname = NULL;
        char *newval = NULL;
        struct ceph_inode_xattr *xattr = NULL;
-       int issued;
        int required_blob_size;
-       int dirty;
 
        if (ceph_snap(inode) != CEPH_NOSNAP)
                return -EROFS;
@@ -716,12 +786,9 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
        if (!ceph_is_valid_xattr(name))
                return -EOPNOTSUPP;
 
-       if (vxattrs) {
-               struct ceph_vxattr_cb *vxattr =
-                       ceph_match_vxattr(vxattrs, name);
-               if (vxattr && vxattr->readonly)
-                       return -EOPNOTSUPP;
-       }
+       vxattr = ceph_match_vxattr(inode, name);
+       if (vxattr && vxattr->readonly)
+               return -EOPNOTSUPP;
 
        /* preallocate memory for xattr name, value, index node */
        err = -ENOMEM;
@@ -730,11 +797,9 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
                goto out;
 
        if (val_len) {
-               newval = kmalloc(val_len + 1, GFP_NOFS);
+               newval = kmemdup(value, val_len, GFP_NOFS);
                if (!newval)
                        goto out;
-               memcpy(newval, value, val_len);
-               newval[val_len] = '\0';
        }
 
        xattr = kmalloc(sizeof(struct ceph_inode_xattr), GFP_NOFS);
@@ -744,6 +809,7 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
        spin_lock(&ci->i_ceph_lock);
 retry:
        issued = __ceph_caps_issued(ci, NULL);
+       dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued));
        if (!(issued & CEPH_CAP_XATTR_EXCL))
                goto do_sync;
        __build_xattrs(inode);
@@ -752,7 +818,7 @@ retry:
 
        if (!ci->i_xattrs.prealloc_blob ||
            required_blob_size > ci->i_xattrs.prealloc_blob->alloc_len) {
-               struct ceph_buffer *blob = NULL;
+               struct ceph_buffer *blob;
 
                spin_unlock(&ci->i_ceph_lock);
                dout(" preaallocating new blob size=%d\n", required_blob_size);
@@ -766,12 +832,13 @@ retry:
                goto retry;
        }
 
-       dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued));
        err = __set_xattr(ci, newname, name_len, newval,
                          val_len, 1, 1, 1, &xattr);
+
        dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL);
        ci->i_xattrs.dirty = true;
        inode->i_ctime = CURRENT_TIME;
+
        spin_unlock(&ci->i_ceph_lock);
        if (dirty)
                __mark_inode_dirty(inode, dirty);
@@ -816,8 +883,8 @@ static int ceph_send_removexattr(struct dentry *dentry, const char *name)
 int ceph_removexattr(struct dentry *dentry, const char *name)
 {
        struct inode *inode = dentry->d_inode;
+       struct ceph_vxattr *vxattr;
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_vxattr_cb *vxattrs = ceph_inode_vxattrs(inode);
        int issued;
        int err;
        int required_blob_size;
@@ -829,22 +896,19 @@ int ceph_removexattr(struct dentry *dentry, const char *name)
        if (!ceph_is_valid_xattr(name))
                return -EOPNOTSUPP;
 
-       if (vxattrs) {
-               struct ceph_vxattr_cb *vxattr =
-                       ceph_match_vxattr(vxattrs, name);
-               if (vxattr && vxattr->readonly)
-                       return -EOPNOTSUPP;
-       }
+       vxattr = ceph_match_vxattr(inode, name);
+       if (vxattr && vxattr->readonly)
+               return -EOPNOTSUPP;
 
        err = -ENOMEM;
        spin_lock(&ci->i_ceph_lock);
-       __build_xattrs(inode);
 retry:
        issued = __ceph_caps_issued(ci, NULL);
        dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued));
 
        if (!(issued & CEPH_CAP_XATTR_EXCL))
                goto do_sync;
+       __build_xattrs(inode);
 
        required_blob_size = __get_required_blob_size(ci, 0, 0);
 
@@ -865,10 +929,10 @@ retry:
        }
 
        err = __remove_xattr_by_name(ceph_inode(inode), name);
+
        dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL);
        ci->i_xattrs.dirty = true;
        inode->i_ctime = CURRENT_TIME;
-
        spin_unlock(&ci->i_ceph_lock);
        if (dirty)
                __mark_inode_dirty(inode, dirty);
index e8cf0ccd1a8dd180c0a21c8dca40c139bbf041b6..e71d683982a6f651a83a22fd7ef12bf7af6f655f 100644 (file)
@@ -208,7 +208,7 @@ extern struct kmem_cache *ceph_cap_cachep;
 extern struct kmem_cache *ceph_dentry_cachep;
 extern struct kmem_cache *ceph_file_cachep;
 
-extern int ceph_parse_options(struct ceph_options **popt, char *options,
+extern struct ceph_options *ceph_parse_options(char *options,
                              const char *dev_name, const char *dev_name_end,
                              int (*parse_extra_token)(char *c, void *private),
                              void *private);
index ffbeb2c217b442036a2675cc4cb4e6d5e1bf0d8b..3bff047f6b0f19d1037e4e7157fc785dd213f4cc 100644 (file)
@@ -14,8 +14,6 @@
 struct ceph_msg;
 struct ceph_connection;
 
-extern struct workqueue_struct *ceph_msgr_wq;       /* receive work queue */
-
 /*
  * Ceph defines these callbacks for handling connection events.
  */
@@ -54,7 +52,6 @@ struct ceph_connection_operations {
 struct ceph_messenger {
        struct ceph_entity_inst inst;    /* my name+address */
        struct ceph_entity_addr my_enc_addr;
-       struct page *zero_page;          /* used in certain error cases */
 
        bool nocrc;
 
@@ -101,7 +98,7 @@ struct ceph_msg {
 struct ceph_msg_pos {
        int page, page_pos;  /* which page; offset in page */
        int data_pos;        /* offset in data payload */
-       int did_page_crc;    /* true if we've calculated crc for current page */
+       bool did_page_crc;   /* true if we've calculated crc for current page */
 };
 
 /* ceph connection fault delay defaults, for exponential backoff */
index 761ad9d6cc3b12fc4d6d8c10022e4c877559f5c8..cc913193d992d98f80949f7f167ff7ecb48fdcc1 100644 (file)
@@ -201,7 +201,9 @@ enum {
        Opt_ip,
        Opt_last_string,
        /* string args above */
+       Opt_share,
        Opt_noshare,
+       Opt_crc,
        Opt_nocrc,
 };
 
@@ -217,7 +219,9 @@ static match_table_t opt_tokens = {
        {Opt_key, "key=%s"},
        {Opt_ip, "ip=%s"},
        /* string args above */
+       {Opt_share, "share"},
        {Opt_noshare, "noshare"},
+       {Opt_crc, "crc"},
        {Opt_nocrc, "nocrc"},
        {-1, NULL}
 };
@@ -277,10 +281,11 @@ out:
        return err;
 }
 
-int ceph_parse_options(struct ceph_options **popt, char *options,
-                      const char *dev_name, const char *dev_name_end,
-                      int (*parse_extra_token)(char *c, void *private),
-                      void *private)
+struct ceph_options *
+ceph_parse_options(char *options, const char *dev_name,
+                       const char *dev_name_end,
+                       int (*parse_extra_token)(char *c, void *private),
+                       void *private)
 {
        struct ceph_options *opt;
        const char *c;
@@ -289,7 +294,7 @@ int ceph_parse_options(struct ceph_options **popt, char *options,
 
        opt = kzalloc(sizeof(*opt), GFP_KERNEL);
        if (!opt)
-               return err;
+               return ERR_PTR(-ENOMEM);
        opt->mon_addr = kcalloc(CEPH_MAX_MON, sizeof(*opt->mon_addr),
                                GFP_KERNEL);
        if (!opt->mon_addr)
@@ -398,10 +403,16 @@ int ceph_parse_options(struct ceph_options **popt, char *options,
                        opt->mount_timeout = intval;
                        break;
 
+               case Opt_share:
+                       opt->flags &= ~CEPH_OPT_NOSHARE;
+                       break;
                case Opt_noshare:
                        opt->flags |= CEPH_OPT_NOSHARE;
                        break;
 
+               case Opt_crc:
+                       opt->flags &= ~CEPH_OPT_NOCRC;
+                       break;
                case Opt_nocrc:
                        opt->flags |= CEPH_OPT_NOCRC;
                        break;
@@ -412,12 +423,11 @@ int ceph_parse_options(struct ceph_options **popt, char *options,
        }
 
        /* success */
-       *popt = opt;
-       return 0;
+       return opt;
 
 out:
        ceph_destroy_options(opt);
-       return err;
+       return ERR_PTR(err);
 }
 EXPORT_SYMBOL(ceph_parse_options);
 
index ad5b70801f37788edbb71b6d95cbe7d25ab94abd..f0993af2ae4deff311f2da78a14bcbe14b2fb8c2 100644 (file)
@@ -38,48 +38,54 @@ static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
 static struct lock_class_key socket_class;
 #endif
 
+/*
+ * When skipping (ignoring) a block of input we read it into a "skip
+ * buffer," which is this many bytes in size.
+ */
+#define SKIP_BUF_SIZE  1024
 
 static void queue_con(struct ceph_connection *con);
 static void con_work(struct work_struct *);
 static void ceph_fault(struct ceph_connection *con);
 
 /*
- * nicely render a sockaddr as a string.
+ * Nicely render a sockaddr as a string.  An array of formatted
+ * strings is used, to approximate reentrancy.
  */
-#define MAX_ADDR_STR 20
-#define MAX_ADDR_STR_LEN 60
-static char addr_str[MAX_ADDR_STR][MAX_ADDR_STR_LEN];
-static DEFINE_SPINLOCK(addr_str_lock);
-static int last_addr_str;
+#define ADDR_STR_COUNT_LOG     5       /* log2(# address strings in array) */
+#define ADDR_STR_COUNT         (1 << ADDR_STR_COUNT_LOG)
+#define ADDR_STR_COUNT_MASK    (ADDR_STR_COUNT - 1)
+#define MAX_ADDR_STR_LEN       64      /* 54 is enough */
+
+static char addr_str[ADDR_STR_COUNT][MAX_ADDR_STR_LEN];
+static atomic_t addr_str_seq = ATOMIC_INIT(0);
+
+static struct page *zero_page;         /* used in certain error cases */
 
 const char *ceph_pr_addr(const struct sockaddr_storage *ss)
 {
        int i;
        char *s;
-       struct sockaddr_in *in4 = (void *)ss;
-       struct sockaddr_in6 *in6 = (void *)ss;
-
-       spin_lock(&addr_str_lock);
-       i = last_addr_str++;
-       if (last_addr_str == MAX_ADDR_STR)
-               last_addr_str = 0;
-       spin_unlock(&addr_str_lock);
+       struct sockaddr_in *in4 = (struct sockaddr_in *) ss;
+       struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) ss;
+
+       i = atomic_inc_return(&addr_str_seq) & ADDR_STR_COUNT_MASK;
        s = addr_str[i];
 
        switch (ss->ss_family) {
        case AF_INET:
-               snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%u", &in4->sin_addr,
-                        (unsigned int)ntohs(in4->sin_port));
+               snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%hu", &in4->sin_addr,
+                        ntohs(in4->sin_port));
                break;
 
        case AF_INET6:
-               snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%u", &in6->sin6_addr,
-                        (unsigned int)ntohs(in6->sin6_port));
+               snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%hu", &in6->sin6_addr,
+                        ntohs(in6->sin6_port));
                break;
 
        default:
-               snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %d)",
-                        (int)ss->ss_family);
+               snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %hu)",
+                        ss->ss_family);
        }
 
        return s;
@@ -95,22 +101,43 @@ static void encode_my_addr(struct ceph_messenger *msgr)
 /*
  * work queue for all reading and writing to/from the socket.
  */
-struct workqueue_struct *ceph_msgr_wq;
+static struct workqueue_struct *ceph_msgr_wq;
+
+void _ceph_msgr_exit(void)
+{
+       if (ceph_msgr_wq) {
+               destroy_workqueue(ceph_msgr_wq);
+               ceph_msgr_wq = NULL;
+       }
+
+       BUG_ON(zero_page == NULL);
+       kunmap(zero_page);
+       page_cache_release(zero_page);
+       zero_page = NULL;
+}
 
 int ceph_msgr_init(void)
 {
+       BUG_ON(zero_page != NULL);
+       zero_page = ZERO_PAGE(0);
+       page_cache_get(zero_page);
+
        ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
-       if (!ceph_msgr_wq) {
-               pr_err("msgr_init failed to create workqueue\n");
-               return -ENOMEM;
-       }
-       return 0;
+       if (ceph_msgr_wq)
+               return 0;
+
+       pr_err("msgr_init failed to create workqueue\n");
+       _ceph_msgr_exit();
+
+       return -ENOMEM;
 }
 EXPORT_SYMBOL(ceph_msgr_init);
 
 void ceph_msgr_exit(void)
 {
-       destroy_workqueue(ceph_msgr_wq);
+       BUG_ON(ceph_msgr_wq == NULL);
+
+       _ceph_msgr_exit();
 }
 EXPORT_SYMBOL(ceph_msgr_exit);
 
@@ -128,8 +155,8 @@ EXPORT_SYMBOL(ceph_msgr_flush);
 /* data available on socket, or listen socket received a connect */
 static void ceph_data_ready(struct sock *sk, int count_unused)
 {
-       struct ceph_connection *con =
-               (struct ceph_connection *)sk->sk_user_data;
+       struct ceph_connection *con = sk->sk_user_data;
+
        if (sk->sk_state != TCP_CLOSE_WAIT) {
                dout("ceph_data_ready on %p state = %lu, queueing work\n",
                     con, con->state);
@@ -140,26 +167,30 @@ static void ceph_data_ready(struct sock *sk, int count_unused)
 /* socket has buffer space for writing */
 static void ceph_write_space(struct sock *sk)
 {
-       struct ceph_connection *con =
-               (struct ceph_connection *)sk->sk_user_data;
+       struct ceph_connection *con = sk->sk_user_data;
 
-       /* only queue to workqueue if there is data we want to write. */
+       /* only queue to workqueue if there is data we want to write,
+        * and there is sufficient space in the socket buffer to accept
+        * more data.  clear SOCK_NOSPACE so that ceph_write_space()
+        * doesn't get called again until try_write() fills the socket
+        * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
+        * and net/core/stream.c:sk_stream_write_space().
+        */
        if (test_bit(WRITE_PENDING, &con->state)) {
-               dout("ceph_write_space %p queueing write work\n", con);
-               queue_con(con);
+               if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
+                       dout("ceph_write_space %p queueing write work\n", con);
+                       clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
+                       queue_con(con);
+               }
        } else {
                dout("ceph_write_space %p nothing to write\n", con);
        }
-
-       /* since we have our own write_space, clear the SOCK_NOSPACE flag */
-       clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
 }
 
 /* socket's state has changed */
 static void ceph_state_change(struct sock *sk)
 {
-       struct ceph_connection *con =
-               (struct ceph_connection *)sk->sk_user_data;
+       struct ceph_connection *con = sk->sk_user_data;
 
        dout("ceph_state_change %p state = %lu sk_state = %u\n",
             con, con->state, sk->sk_state);
@@ -184,6 +215,8 @@ static void ceph_state_change(struct sock *sk)
                dout("ceph_state_change TCP_ESTABLISHED\n");
                queue_con(con);
                break;
+       default:        /* Everything else is uninteresting */
+               break;
        }
 }
 
@@ -194,7 +227,7 @@ static void set_sock_callbacks(struct socket *sock,
                               struct ceph_connection *con)
 {
        struct sock *sk = sock->sk;
-       sk->sk_user_data = (void *)con;
+       sk->sk_user_data = con;
        sk->sk_data_ready = ceph_data_ready;
        sk->sk_write_space = ceph_write_space;
        sk->sk_state_change = ceph_state_change;
@@ -208,7 +241,7 @@ static void set_sock_callbacks(struct socket *sock,
 /*
  * initiate connection to a remote socket.
  */
-static struct socket *ceph_tcp_connect(struct ceph_connection *con)
+static int ceph_tcp_connect(struct ceph_connection *con)
 {
        struct sockaddr_storage *paddr = &con->peer_addr.in_addr;
        struct socket *sock;
@@ -218,8 +251,7 @@ static struct socket *ceph_tcp_connect(struct ceph_connection *con)
        ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM,
                               IPPROTO_TCP, &sock);
        if (ret)
-               return ERR_PTR(ret);
-       con->sock = sock;
+               return ret;
        sock->sk->sk_allocation = GFP_NOFS;
 
 #ifdef CONFIG_LOCKDEP
@@ -236,19 +268,17 @@ static struct socket *ceph_tcp_connect(struct ceph_connection *con)
                dout("connect %s EINPROGRESS sk_state = %u\n",
                     ceph_pr_addr(&con->peer_addr.in_addr),
                     sock->sk->sk_state);
-               ret = 0;
-       }
-       if (ret < 0) {
+       } else if (ret < 0) {
                pr_err("connect %s error %d\n",
                       ceph_pr_addr(&con->peer_addr.in_addr), ret);
                sock_release(sock);
-               con->sock = NULL;
                con->error_msg = "connect error";
+
+               return ret;
        }
+       con->sock = sock;
 
-       if (ret < 0)
-               return ERR_PTR(ret);
-       return sock;
+       return 0;
 }
 
 static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
@@ -284,6 +314,19 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
        return r;
 }
 
+static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
+                    int offset, size_t size, int more)
+{
+       int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR);
+       int ret;
+
+       ret = kernel_sendpage(sock, page, offset, size, flags);
+       if (ret == -EAGAIN)
+               ret = 0;
+
+       return ret;
+}
+
 
 /*
  * Shutdown/close the socket for the given connection.
@@ -391,22 +434,23 @@ bool ceph_con_opened(struct ceph_connection *con)
  */
 struct ceph_connection *ceph_con_get(struct ceph_connection *con)
 {
-       dout("con_get %p nref = %d -> %d\n", con,
-            atomic_read(&con->nref), atomic_read(&con->nref) + 1);
-       if (atomic_inc_not_zero(&con->nref))
-               return con;
-       return NULL;
+       int nref = __atomic_add_unless(&con->nref, 1, 0);
+
+       dout("con_get %p nref = %d -> %d\n", con, nref, nref + 1);
+
+       return nref ? con : NULL;
 }
 
 void ceph_con_put(struct ceph_connection *con)
 {
-       dout("con_put %p nref = %d -> %d\n", con,
-            atomic_read(&con->nref), atomic_read(&con->nref) - 1);
-       BUG_ON(atomic_read(&con->nref) == 0);
-       if (atomic_dec_and_test(&con->nref)) {
+       int nref = atomic_dec_return(&con->nref);
+
+       BUG_ON(nref < 0);
+       if (nref == 0) {
                BUG_ON(con->sock);
                kfree(con);
        }
+       dout("con_put %p nref = %d -> %d\n", con, nref + 1, nref);
 }
 
 /*
@@ -442,14 +486,35 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
        return ret;
 }
 
+static void ceph_con_out_kvec_reset(struct ceph_connection *con)
+{
+       con->out_kvec_left = 0;
+       con->out_kvec_bytes = 0;
+       con->out_kvec_cur = &con->out_kvec[0];
+}
+
+static void ceph_con_out_kvec_add(struct ceph_connection *con,
+                               size_t size, void *data)
+{
+       int index;
+
+       index = con->out_kvec_left;
+       BUG_ON(index >= ARRAY_SIZE(con->out_kvec));
+
+       con->out_kvec[index].iov_len = size;
+       con->out_kvec[index].iov_base = data;
+       con->out_kvec_left++;
+       con->out_kvec_bytes += size;
+}
 
 /*
  * Prepare footer for currently outgoing message, and finish things
  * off.  Assumes out_kvec* are already valid.. we just add on to the end.
  */
-static void prepare_write_message_footer(struct ceph_connection *con, int v)
+static void prepare_write_message_footer(struct ceph_connection *con)
 {
        struct ceph_msg *m = con->out_msg;
+       int v = con->out_kvec_left;
 
        dout("prepare_write_message_footer %p\n", con);
        con->out_kvec_is_msg = true;
@@ -467,9 +532,9 @@ static void prepare_write_message_footer(struct ceph_connection *con, int v)
 static void prepare_write_message(struct ceph_connection *con)
 {
        struct ceph_msg *m;
-       int v = 0;
+       u32 crc;
 
-       con->out_kvec_bytes = 0;
+       ceph_con_out_kvec_reset(con);
        con->out_kvec_is_msg = true;
        con->out_msg_done = false;
 
@@ -477,16 +542,13 @@ static void prepare_write_message(struct ceph_connection *con)
         * TCP packet that's a good thing. */
        if (con->in_seq > con->in_seq_acked) {
                con->in_seq_acked = con->in_seq;
-               con->out_kvec[v].iov_base = &tag_ack;
-               con->out_kvec[v++].iov_len = 1;
+               ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
                con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
-               con->out_kvec[v].iov_base = &con->out_temp_ack;
-               con->out_kvec[v++].iov_len = sizeof(con->out_temp_ack);
-               con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
+               ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack),
+                       &con->out_temp_ack);
        }
 
-       m = list_first_entry(&con->out_queue,
-                      struct ceph_msg, list_head);
+       m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
        con->out_msg = m;
 
        /* put message on sent list */
@@ -510,30 +572,26 @@ static void prepare_write_message(struct ceph_connection *con)
        BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
 
        /* tag + hdr + front + middle */
-       con->out_kvec[v].iov_base = &tag_msg;
-       con->out_kvec[v++].iov_len = 1;
-       con->out_kvec[v].iov_base = &m->hdr;
-       con->out_kvec[v++].iov_len = sizeof(m->hdr);
-       con->out_kvec[v++] = m->front;
+       ceph_con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
+       ceph_con_out_kvec_add(con, sizeof (m->hdr), &m->hdr);
+       ceph_con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
+
        if (m->middle)
-               con->out_kvec[v++] = m->middle->vec;
-       con->out_kvec_left = v;
-       con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len +
-               (m->middle ? m->middle->vec.iov_len : 0);
-       con->out_kvec_cur = con->out_kvec;
+               ceph_con_out_kvec_add(con, m->middle->vec.iov_len,
+                       m->middle->vec.iov_base);
 
        /* fill in crc (except data pages), footer */
-       con->out_msg->hdr.crc =
-               cpu_to_le32(crc32c(0, (void *)&m->hdr,
-                                     sizeof(m->hdr) - sizeof(m->hdr.crc)));
+       crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
+       con->out_msg->hdr.crc = cpu_to_le32(crc);
        con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE;
-       con->out_msg->footer.front_crc =
-               cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len));
-       if (m->middle)
-               con->out_msg->footer.middle_crc =
-                       cpu_to_le32(crc32c(0, m->middle->vec.iov_base,
-                                          m->middle->vec.iov_len));
-       else
+
+       crc = crc32c(0, m->front.iov_base, m->front.iov_len);
+       con->out_msg->footer.front_crc = cpu_to_le32(crc);
+       if (m->middle) {
+               crc = crc32c(0, m->middle->vec.iov_base,
+                               m->middle->vec.iov_len);
+               con->out_msg->footer.middle_crc = cpu_to_le32(crc);
+       } else
                con->out_msg->footer.middle_crc = 0;
        con->out_msg->footer.data_crc = 0;
        dout("prepare_write_message front_crc %u data_crc %u\n",
@@ -549,11 +607,11 @@ static void prepare_write_message(struct ceph_connection *con)
                else
                        con->out_msg_pos.page_pos = 0;
                con->out_msg_pos.data_pos = 0;
-               con->out_msg_pos.did_page_crc = 0;
+               con->out_msg_pos.did_page_crc = false;
                con->out_more = 1;  /* data + footer will follow */
        } else {
                /* no, queue up footer too and be done */
-               prepare_write_message_footer(con, v);
+               prepare_write_message_footer(con);
        }
 
        set_bit(WRITE_PENDING, &con->state);
@@ -568,14 +626,14 @@ static void prepare_write_ack(struct ceph_connection *con)
             con->in_seq_acked, con->in_seq);
        con->in_seq_acked = con->in_seq;
 
-       con->out_kvec[0].iov_base = &tag_ack;
-       con->out_kvec[0].iov_len = 1;
+       ceph_con_out_kvec_reset(con);
+
+       ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
+
        con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
-       con->out_kvec[1].iov_base = &con->out_temp_ack;
-       con->out_kvec[1].iov_len = sizeof(con->out_temp_ack);
-       con->out_kvec_left = 2;
-       con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack);
-       con->out_kvec_cur = con->out_kvec;
+       ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack),
+                               &con->out_temp_ack);
+
        con->out_more = 1;  /* more will follow.. eventually.. */
        set_bit(WRITE_PENDING, &con->state);
 }
@@ -586,11 +644,8 @@ static void prepare_write_ack(struct ceph_connection *con)
 static void prepare_write_keepalive(struct ceph_connection *con)
 {
        dout("prepare_write_keepalive %p\n", con);
-       con->out_kvec[0].iov_base = &tag_keepalive;
-       con->out_kvec[0].iov_len = 1;
-       con->out_kvec_left = 1;
-       con->out_kvec_bytes = 1;
-       con->out_kvec_cur = con->out_kvec;
+       ceph_con_out_kvec_reset(con);
+       ceph_con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
        set_bit(WRITE_PENDING, &con->state);
 }
 
@@ -619,12 +674,9 @@ static int prepare_connect_authorizer(struct ceph_connection *con)
        con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
        con->out_connect.authorizer_len = cpu_to_le32(auth_len);
 
-       if (auth_len) {
-               con->out_kvec[con->out_kvec_left].iov_base = auth_buf;
-               con->out_kvec[con->out_kvec_left].iov_len = auth_len;
-               con->out_kvec_left++;
-               con->out_kvec_bytes += auth_len;
-       }
+       if (auth_len)
+               ceph_con_out_kvec_add(con, auth_len, auth_buf);
+
        return 0;
 }
 
@@ -634,22 +686,18 @@ static int prepare_connect_authorizer(struct ceph_connection *con)
 static void prepare_write_banner(struct ceph_messenger *msgr,
                                 struct ceph_connection *con)
 {
-       int len = strlen(CEPH_BANNER);
+       ceph_con_out_kvec_reset(con);
+       ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
+       ceph_con_out_kvec_add(con, sizeof (msgr->my_enc_addr),
+                                       &msgr->my_enc_addr);
 
-       con->out_kvec[0].iov_base = CEPH_BANNER;
-       con->out_kvec[0].iov_len = len;
-       con->out_kvec[1].iov_base = &msgr->my_enc_addr;
-       con->out_kvec[1].iov_len = sizeof(msgr->my_enc_addr);
-       con->out_kvec_left = 2;
-       con->out_kvec_bytes = len + sizeof(msgr->my_enc_addr);
-       con->out_kvec_cur = con->out_kvec;
        con->out_more = 0;
        set_bit(WRITE_PENDING, &con->state);
 }
 
 static int prepare_write_connect(struct ceph_messenger *msgr,
                                 struct ceph_connection *con,
-                                int after_banner)
+                                int include_banner)
 {
        unsigned global_seq = get_global_seq(con->msgr, 0);
        int proto;
@@ -678,22 +726,18 @@ static int prepare_write_connect(struct ceph_messenger *msgr,
        con->out_connect.protocol_version = cpu_to_le32(proto);
        con->out_connect.flags = 0;
 
-       if (!after_banner) {
-               con->out_kvec_left = 0;
-               con->out_kvec_bytes = 0;
-       }
-       con->out_kvec[con->out_kvec_left].iov_base = &con->out_connect;
-       con->out_kvec[con->out_kvec_left].iov_len = sizeof(con->out_connect);
-       con->out_kvec_left++;
-       con->out_kvec_bytes += sizeof(con->out_connect);
-       con->out_kvec_cur = con->out_kvec;
+       if (include_banner)
+               prepare_write_banner(msgr, con);
+       else
+               ceph_con_out_kvec_reset(con);
+       ceph_con_out_kvec_add(con, sizeof (con->out_connect), &con->out_connect);
+
        con->out_more = 0;
        set_bit(WRITE_PENDING, &con->state);
 
        return prepare_connect_authorizer(con);
 }
 
-
 /*
  * write as much of pending kvecs to the socket as we can.
  *  1 -> done
@@ -714,17 +758,18 @@ static int write_partial_kvec(struct ceph_connection *con)
                con->out_kvec_bytes -= ret;
                if (con->out_kvec_bytes == 0)
                        break;            /* done */
-               while (ret > 0) {
-                       if (ret >= con->out_kvec_cur->iov_len) {
-                               ret -= con->out_kvec_cur->iov_len;
-                               con->out_kvec_cur++;
-                               con->out_kvec_left--;
-                       } else {
-                               con->out_kvec_cur->iov_len -= ret;
-                               con->out_kvec_cur->iov_base += ret;
-                               ret = 0;
-                               break;
-                       }
+
+               /* account for full iov entries consumed */
+               while (ret >= con->out_kvec_cur->iov_len) {
+                       BUG_ON(!con->out_kvec_left);
+                       ret -= con->out_kvec_cur->iov_len;
+                       con->out_kvec_cur++;
+                       con->out_kvec_left--;
+               }
+               /* and for a partially-consumed entry */
+               if (ret) {
+                       con->out_kvec_cur->iov_len -= ret;
+                       con->out_kvec_cur->iov_base += ret;
                }
        }
        con->out_kvec_left = 0;
@@ -773,7 +818,7 @@ static int write_partial_msg_pages(struct ceph_connection *con)
        struct ceph_msg *msg = con->out_msg;
        unsigned data_len = le32_to_cpu(msg->hdr.data_len);
        size_t len;
-       int crc = con->msgr->nocrc;
+       bool do_datacrc = !con->msgr->nocrc;
        int ret;
        int total_max_write;
        int in_trail = 0;
@@ -790,9 +835,8 @@ static int write_partial_msg_pages(struct ceph_connection *con)
 
        while (data_len > con->out_msg_pos.data_pos) {
                struct page *page = NULL;
-               void *kaddr = NULL;
                int max_write = PAGE_SIZE;
-               int page_shift = 0;
+               int bio_offset = 0;
 
                total_max_write = data_len - trail_len -
                        con->out_msg_pos.data_pos;
@@ -811,58 +855,47 @@ static int write_partial_msg_pages(struct ceph_connection *con)
 
                        page = list_first_entry(&msg->trail->head,
                                                struct page, lru);
-                       if (crc)
-                               kaddr = kmap(page);
                        max_write = PAGE_SIZE;
                } else if (msg->pages) {
                        page = msg->pages[con->out_msg_pos.page];
-                       if (crc)
-                               kaddr = kmap(page);
                } else if (msg->pagelist) {
                        page = list_first_entry(&msg->pagelist->head,
                                                struct page, lru);
-                       if (crc)
-                               kaddr = kmap(page);
 #ifdef CONFIG_BLOCK
                } else if (msg->bio) {
                        struct bio_vec *bv;
 
                        bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
                        page = bv->bv_page;
-                       page_shift = bv->bv_offset;
-                       if (crc)
-                               kaddr = kmap(page) + page_shift;
+                       bio_offset = bv->bv_offset;
                        max_write = bv->bv_len;
 #endif
                } else {
-                       page = con->msgr->zero_page;
-                       if (crc)
-                               kaddr = page_address(con->msgr->zero_page);
+                       page = zero_page;
                }
                len = min_t(int, max_write - con->out_msg_pos.page_pos,
                            total_max_write);
 
-               if (crc && !con->out_msg_pos.did_page_crc) {
-                       void *base = kaddr + con->out_msg_pos.page_pos;
+               if (do_datacrc && !con->out_msg_pos.did_page_crc) {
+                       void *base;
+                       u32 crc;
                        u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
+                       char *kaddr;
 
+                       kaddr = kmap(page);
                        BUG_ON(kaddr == NULL);
-                       con->out_msg->footer.data_crc =
-                               cpu_to_le32(crc32c(tmpcrc, base, len));
-                       con->out_msg_pos.did_page_crc = 1;
+                       base = kaddr + con->out_msg_pos.page_pos + bio_offset;
+                       crc = crc32c(tmpcrc, base, len);
+                       con->out_msg->footer.data_crc = cpu_to_le32(crc);
+                       con->out_msg_pos.did_page_crc = true;
                }
-               ret = kernel_sendpage(con->sock, page,
-                                     con->out_msg_pos.page_pos + page_shift,
-                                     len,
-                                     MSG_DONTWAIT | MSG_NOSIGNAL |
-                                     MSG_MORE);
-
-               if (crc &&
-                   (msg->pages || msg->pagelist || msg->bio || in_trail))
+               ret = ceph_tcp_sendpage(con->sock, page,
+                                     con->out_msg_pos.page_pos + bio_offset,
+                                     len, 1);
+
+               if (do_datacrc)
                        kunmap(page);
 
-               if (ret == -EAGAIN)
-                       ret = 0;
                if (ret <= 0)
                        goto out;
 
@@ -871,7 +904,7 @@ static int write_partial_msg_pages(struct ceph_connection *con)
                if (ret == len) {
                        con->out_msg_pos.page_pos = 0;
                        con->out_msg_pos.page++;
-                       con->out_msg_pos.did_page_crc = 0;
+                       con->out_msg_pos.did_page_crc = false;
                        if (in_trail)
                                list_move_tail(&page->lru,
                                               &msg->trail->head);
@@ -888,12 +921,10 @@ static int write_partial_msg_pages(struct ceph_connection *con)
        dout("write_partial_msg_pages %p msg %p done\n", con, msg);
 
        /* prepare and queue up footer, too */
-       if (!crc)
+       if (!do_datacrc)
                con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
-       con->out_kvec_bytes = 0;
-       con->out_kvec_left = 0;
-       con->out_kvec_cur = con->out_kvec;
-       prepare_write_message_footer(con, 0);
+       ceph_con_out_kvec_reset(con);
+       prepare_write_message_footer(con);
        ret = 1;
 out:
        return ret;
@@ -907,12 +938,9 @@ static int write_partial_skip(struct ceph_connection *con)
        int ret;
 
        while (con->out_skip > 0) {
-               struct kvec iov = {
-                       .iov_base = page_address(con->msgr->zero_page),
-                       .iov_len = min(con->out_skip, (int)PAGE_CACHE_SIZE)
-               };
+               size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE);
 
-               ret = ceph_tcp_sendmsg(con->sock, &iov, 1, iov.iov_len, 1);
+               ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, 1);
                if (ret <= 0)
                        goto out;
                con->out_skip -= ret;
@@ -1085,8 +1113,8 @@ static void addr_set_port(struct sockaddr_storage *ss, int p)
 static int ceph_pton(const char *str, size_t len, struct sockaddr_storage *ss,
                char delim, const char **ipend)
 {
-       struct sockaddr_in *in4 = (void *)ss;
-       struct sockaddr_in6 *in6 = (void *)ss;
+       struct sockaddr_in *in4 = (struct sockaddr_in *) ss;
+       struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) ss;
 
        memset(ss, 0, sizeof(*ss));
 
@@ -1512,10 +1540,9 @@ static int read_partial_message_section(struct ceph_connection *con,
                if (ret <= 0)
                        return ret;
                section->iov_len += ret;
-               if (section->iov_len == sec_len)
-                       *crc = crc32c(0, section->iov_base,
-                                     section->iov_len);
        }
+       if (section->iov_len == sec_len)
+               *crc = crc32c(0, section->iov_base, section->iov_len);
 
        return 1;
 }
@@ -1527,7 +1554,7 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
 
 static int read_partial_message_pages(struct ceph_connection *con,
                                      struct page **pages,
-                                     unsigned data_len, int datacrc)
+                                     unsigned data_len, bool do_datacrc)
 {
        void *p;
        int ret;
@@ -1540,7 +1567,7 @@ static int read_partial_message_pages(struct ceph_connection *con,
        p = kmap(pages[con->in_msg_pos.page]);
        ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
                               left);
-       if (ret > 0 && datacrc)
+       if (ret > 0 && do_datacrc)
                con->in_data_crc =
                        crc32c(con->in_data_crc,
                                  p + con->in_msg_pos.page_pos, ret);
@@ -1560,7 +1587,7 @@ static int read_partial_message_pages(struct ceph_connection *con,
 #ifdef CONFIG_BLOCK
 static int read_partial_message_bio(struct ceph_connection *con,
                                    struct bio **bio_iter, int *bio_seg,
-                                   unsigned data_len, int datacrc)
+                                   unsigned data_len, bool do_datacrc)
 {
        struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
        void *p;
@@ -1576,7 +1603,7 @@ static int read_partial_message_bio(struct ceph_connection *con,
 
        ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
                               left);
-       if (ret > 0 && datacrc)
+       if (ret > 0 && do_datacrc)
                con->in_data_crc =
                        crc32c(con->in_data_crc,
                                  p + con->in_msg_pos.page_pos, ret);
@@ -1603,9 +1630,10 @@ static int read_partial_message(struct ceph_connection *con)
        int ret;
        int to, left;
        unsigned front_len, middle_len, data_len;
-       int datacrc = con->msgr->nocrc;
+       bool do_datacrc = !con->msgr->nocrc;
        int skip;
        u64 seq;
+       u32 crc;
 
        dout("read_partial_message con %p msg %p\n", con, m);
 
@@ -1618,17 +1646,16 @@ static int read_partial_message(struct ceph_connection *con)
                if (ret <= 0)
                        return ret;
                con->in_base_pos += ret;
-               if (con->in_base_pos == sizeof(con->in_hdr)) {
-                       u32 crc = crc32c(0, (void *)&con->in_hdr,
-                                sizeof(con->in_hdr) - sizeof(con->in_hdr.crc));
-                       if (crc != le32_to_cpu(con->in_hdr.crc)) {
-                               pr_err("read_partial_message bad hdr "
-                                      " crc %u != expected %u\n",
-                                      crc, con->in_hdr.crc);
-                               return -EBADMSG;
-                       }
-               }
        }
+
+       crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc));
+       if (cpu_to_le32(crc) != con->in_hdr.crc) {
+               pr_err("read_partial_message bad hdr "
+                      " crc %u != expected %u\n",
+                      crc, con->in_hdr.crc);
+               return -EBADMSG;
+       }
+
        front_len = le32_to_cpu(con->in_hdr.front_len);
        if (front_len > CEPH_MSG_MAX_FRONT_LEN)
                return -EIO;
@@ -1714,7 +1741,7 @@ static int read_partial_message(struct ceph_connection *con)
        while (con->in_msg_pos.data_pos < data_len) {
                if (m->pages) {
                        ret = read_partial_message_pages(con, m->pages,
-                                                data_len, datacrc);
+                                                data_len, do_datacrc);
                        if (ret <= 0)
                                return ret;
 #ifdef CONFIG_BLOCK
@@ -1722,7 +1749,7 @@ static int read_partial_message(struct ceph_connection *con)
 
                        ret = read_partial_message_bio(con,
                                                 &m->bio_iter, &m->bio_seg,
-                                                data_len, datacrc);
+                                                data_len, do_datacrc);
                        if (ret <= 0)
                                return ret;
 #endif
@@ -1757,7 +1784,7 @@ static int read_partial_message(struct ceph_connection *con)
                       m, con->in_middle_crc, m->footer.middle_crc);
                return -EBADMSG;
        }
-       if (datacrc &&
+       if (do_datacrc &&
            (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
            con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
                pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
@@ -1819,7 +1846,6 @@ more:
 
        /* open the socket first? */
        if (con->sock == NULL) {
-               prepare_write_banner(msgr, con);
                prepare_write_connect(msgr, con, 1);
                prepare_read_banner(con);
                set_bit(CONNECTING, &con->state);
@@ -1829,11 +1855,9 @@ more:
                con->in_tag = CEPH_MSGR_TAG_READY;
                dout("try_write initiating connect on %p new state %lu\n",
                     con, con->state);
-               con->sock = ceph_tcp_connect(con);
-               if (IS_ERR(con->sock)) {
-                       con->sock = NULL;
+               ret = ceph_tcp_connect(con);
+               if (ret < 0) {
                        con->error_msg = "connect error";
-                       ret = -1;
                        goto out;
                }
        }
@@ -1953,8 +1977,9 @@ more:
                 *
                 * FIXME: there must be a better way to do this!
                 */
-               static char buf[1024];
-               int skip = min(1024, -con->in_base_pos);
+               static char buf[SKIP_BUF_SIZE];
+               int skip = min((int) sizeof (buf), -con->in_base_pos);
+
                dout("skipping %d / %d bytes\n", skip, -con->in_base_pos);
                ret = ceph_tcp_recvmsg(con->sock, buf, skip);
                if (ret <= 0)
@@ -2216,15 +2241,6 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr,
 
        spin_lock_init(&msgr->global_seq_lock);
 
-       /* the zero page is needed if a request is "canceled" while the message
-        * is being written over the socket */
-       msgr->zero_page = __page_cache_alloc(GFP_KERNEL | __GFP_ZERO);
-       if (!msgr->zero_page) {
-               kfree(msgr);
-               return ERR_PTR(-ENOMEM);
-       }
-       kmap(msgr->zero_page);
-
        if (myaddr)
                msgr->inst.addr = *myaddr;
 
@@ -2241,8 +2257,6 @@ EXPORT_SYMBOL(ceph_messenger_create);
 void ceph_messenger_destroy(struct ceph_messenger *msgr)
 {
        dout("destroy %p\n", msgr);
-       kunmap(msgr->zero_page);
-       __free_page(msgr->zero_page);
        kfree(msgr);
        dout("destroyed messenger %p\n", msgr);
 }
index fd863fe76934afb33bbee5583d68028366f23b91..29ad46ec9dcfc1d6de7c680c9555c00f5ac6546e 100644 (file)
@@ -283,7 +283,8 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
                ceph_decode_32_safe(p, end, yes, bad);
 #if BITS_PER_LONG == 32
                err = -EINVAL;
-               if (yes > ULONG_MAX / sizeof(struct crush_rule_step))
+               if (yes > (ULONG_MAX - sizeof(*r))
+                         / sizeof(struct crush_rule_step))
                        goto bad;
 #endif
                r = c->rules[i] = kmalloc(sizeof(*r) +