Merge branch 'akpm' (Andrew's patch-bomb)
[~shefty/rdma-dev.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
63
64 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
65 #define RBD_MAX_SNAP_NAME_LEN   \
66                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
67
68 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
69 #define RBD_MAX_OPT_LEN         1024
70
71 #define RBD_SNAP_HEAD_NAME      "-"
72
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX    64
76
77 #define RBD_OBJ_PREFIX_LEN_MAX  64
78
79 /* Feature bits */
80
81 #define RBD_FEATURE_LAYERING      1
82
83 /* Features supported by this (client software) implementation. */
84
85 #define RBD_FEATURES_ALL          (0)
86
87 /*
88  * An RBD device name will be "rbd#", where the "rbd" comes from
89  * RBD_DRV_NAME above, and # is a unique integer identifier.
90  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
91  * enough to hold all possible device names.
92  */
93 #define DEV_NAME_LEN            32
94 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
95
96 #define RBD_READ_ONLY_DEFAULT           false
97
98 /*
99  * block device image metadata (in-memory version)
100  */
101 struct rbd_image_header {
102         /* These four fields never change for a given rbd image */
103         char *object_prefix;
104         u64 features;
105         __u8 obj_order;
106         __u8 crypt_type;
107         __u8 comp_type;
108
109         /* The remaining fields need to be updated occasionally */
110         u64 image_size;
111         struct ceph_snap_context *snapc;
112         char *snap_names;
113         u64 *snap_sizes;
114
115         u64 obj_version;
116 };
117
118 /*
119  * An rbd image specification.
120  *
121  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122  * identify an image.
123  */
124 struct rbd_spec {
125         u64             pool_id;
126         char            *pool_name;
127
128         char            *image_id;
129         size_t          image_id_len;
130         char            *image_name;
131         size_t          image_name_len;
132
133         u64             snap_id;
134         char            *snap_name;
135
136         struct kref     kref;
137 };
138
139 struct rbd_options {
140         bool    read_only;
141 };
142
143 /*
144  * an instance of the client.  multiple devices may share an rbd client.
145  */
146 struct rbd_client {
147         struct ceph_client      *client;
148         struct kref             kref;
149         struct list_head        node;
150 };
151
152 /*
153  * a request completion status
154  */
155 struct rbd_req_status {
156         int done;
157         int rc;
158         u64 bytes;
159 };
160
161 /*
162  * a collection of requests
163  */
164 struct rbd_req_coll {
165         int                     total;
166         int                     num_done;
167         struct kref             kref;
168         struct rbd_req_status   status[0];
169 };
170
171 /*
172  * a single io request
173  */
174 struct rbd_request {
175         struct request          *rq;            /* blk layer request */
176         struct bio              *bio;           /* cloned bio */
177         struct page             **pages;        /* list of used pages */
178         u64                     len;
179         int                     coll_index;
180         struct rbd_req_coll     *coll;
181 };
182
183 struct rbd_snap {
184         struct  device          dev;
185         const char              *name;
186         u64                     size;
187         struct list_head        node;
188         u64                     id;
189         u64                     features;
190 };
191
192 struct rbd_mapping {
193         u64                     size;
194         u64                     features;
195         bool                    read_only;
196 };
197
198 /*
199  * a single device
200  */
201 struct rbd_device {
202         int                     dev_id;         /* blkdev unique id */
203
204         int                     major;          /* blkdev assigned major */
205         struct gendisk          *disk;          /* blkdev's gendisk and rq */
206
207         u32                     image_format;   /* Either 1 or 2 */
208         struct rbd_client       *rbd_client;
209
210         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
211
212         spinlock_t              lock;           /* queue lock */
213
214         struct rbd_image_header header;
215         bool                    exists;
216         struct rbd_spec         *spec;
217
218         char                    *header_name;
219
220         struct ceph_osd_event   *watch_event;
221         struct ceph_osd_request *watch_request;
222
223         struct rbd_spec         *parent_spec;
224         u64                     parent_overlap;
225
226         /* protects updating the header */
227         struct rw_semaphore     header_rwsem;
228
229         struct rbd_mapping      mapping;
230
231         struct list_head        node;
232
233         /* list of snapshots */
234         struct list_head        snaps;
235
236         /* sysfs related */
237         struct device           dev;
238         unsigned long           open_count;
239 };
240
241 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
242
243 static LIST_HEAD(rbd_dev_list);    /* devices */
244 static DEFINE_SPINLOCK(rbd_dev_list_lock);
245
246 static LIST_HEAD(rbd_client_list);              /* clients */
247 static DEFINE_SPINLOCK(rbd_client_list_lock);
248
249 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
250 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
251
252 static void rbd_dev_release(struct device *dev);
253 static void rbd_remove_snap_dev(struct rbd_snap *snap);
254
255 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
256                        size_t count);
257 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
258                           size_t count);
259
260 static struct bus_attribute rbd_bus_attrs[] = {
261         __ATTR(add, S_IWUSR, NULL, rbd_add),
262         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
263         __ATTR_NULL
264 };
265
266 static struct bus_type rbd_bus_type = {
267         .name           = "rbd",
268         .bus_attrs      = rbd_bus_attrs,
269 };
270
271 static void rbd_root_dev_release(struct device *dev)
272 {
273 }
274
275 static struct device rbd_root_dev = {
276         .init_name =    "rbd",
277         .release =      rbd_root_dev_release,
278 };
279
280 #ifdef RBD_DEBUG
281 #define rbd_assert(expr)                                                \
282                 if (unlikely(!(expr))) {                                \
283                         printk(KERN_ERR "\nAssertion failure in %s() "  \
284                                                 "at line %d:\n\n"       \
285                                         "\trbd_assert(%s);\n\n",        \
286                                         __func__, __LINE__, #expr);     \
287                         BUG();                                          \
288                 }
289 #else /* !RBD_DEBUG */
290 #  define rbd_assert(expr)      ((void) 0)
291 #endif /* !RBD_DEBUG */
292
293 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
294 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
295
296 static int rbd_open(struct block_device *bdev, fmode_t mode)
297 {
298         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
299
300         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
301                 return -EROFS;
302
303         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
304         (void) get_device(&rbd_dev->dev);
305         set_device_ro(bdev, rbd_dev->mapping.read_only);
306         rbd_dev->open_count++;
307         mutex_unlock(&ctl_mutex);
308
309         return 0;
310 }
311
312 static int rbd_release(struct gendisk *disk, fmode_t mode)
313 {
314         struct rbd_device *rbd_dev = disk->private_data;
315
316         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
317         rbd_assert(rbd_dev->open_count > 0);
318         rbd_dev->open_count--;
319         put_device(&rbd_dev->dev);
320         mutex_unlock(&ctl_mutex);
321
322         return 0;
323 }
324
325 static const struct block_device_operations rbd_bd_ops = {
326         .owner                  = THIS_MODULE,
327         .open                   = rbd_open,
328         .release                = rbd_release,
329 };
330
331 /*
332  * Initialize an rbd client instance.
333  * We own *ceph_opts.
334  */
335 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
336 {
337         struct rbd_client *rbdc;
338         int ret = -ENOMEM;
339
340         dout("rbd_client_create\n");
341         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
342         if (!rbdc)
343                 goto out_opt;
344
345         kref_init(&rbdc->kref);
346         INIT_LIST_HEAD(&rbdc->node);
347
348         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
349
350         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
351         if (IS_ERR(rbdc->client))
352                 goto out_mutex;
353         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
354
355         ret = ceph_open_session(rbdc->client);
356         if (ret < 0)
357                 goto out_err;
358
359         spin_lock(&rbd_client_list_lock);
360         list_add_tail(&rbdc->node, &rbd_client_list);
361         spin_unlock(&rbd_client_list_lock);
362
363         mutex_unlock(&ctl_mutex);
364
365         dout("rbd_client_create created %p\n", rbdc);
366         return rbdc;
367
368 out_err:
369         ceph_destroy_client(rbdc->client);
370 out_mutex:
371         mutex_unlock(&ctl_mutex);
372         kfree(rbdc);
373 out_opt:
374         if (ceph_opts)
375                 ceph_destroy_options(ceph_opts);
376         return ERR_PTR(ret);
377 }
378
379 /*
380  * Find a ceph client with specific addr and configuration.  If
381  * found, bump its reference count.
382  */
383 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
384 {
385         struct rbd_client *client_node;
386         bool found = false;
387
388         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
389                 return NULL;
390
391         spin_lock(&rbd_client_list_lock);
392         list_for_each_entry(client_node, &rbd_client_list, node) {
393                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
394                         kref_get(&client_node->kref);
395                         found = true;
396                         break;
397                 }
398         }
399         spin_unlock(&rbd_client_list_lock);
400
401         return found ? client_node : NULL;
402 }
403
404 /*
405  * mount options
406  */
407 enum {
408         Opt_last_int,
409         /* int args above */
410         Opt_last_string,
411         /* string args above */
412         Opt_read_only,
413         Opt_read_write,
414         /* Boolean args above */
415         Opt_last_bool,
416 };
417
418 static match_table_t rbd_opts_tokens = {
419         /* int args above */
420         /* string args above */
421         {Opt_read_only, "read_only"},
422         {Opt_read_only, "ro"},          /* Alternate spelling */
423         {Opt_read_write, "read_write"},
424         {Opt_read_write, "rw"},         /* Alternate spelling */
425         /* Boolean args above */
426         {-1, NULL}
427 };
428
429 static int parse_rbd_opts_token(char *c, void *private)
430 {
431         struct rbd_options *rbd_opts = private;
432         substring_t argstr[MAX_OPT_ARGS];
433         int token, intval, ret;
434
435         token = match_token(c, rbd_opts_tokens, argstr);
436         if (token < 0)
437                 return -EINVAL;
438
439         if (token < Opt_last_int) {
440                 ret = match_int(&argstr[0], &intval);
441                 if (ret < 0) {
442                         pr_err("bad mount option arg (not int) "
443                                "at '%s'\n", c);
444                         return ret;
445                 }
446                 dout("got int token %d val %d\n", token, intval);
447         } else if (token > Opt_last_int && token < Opt_last_string) {
448                 dout("got string token %d val %s\n", token,
449                      argstr[0].from);
450         } else if (token > Opt_last_string && token < Opt_last_bool) {
451                 dout("got Boolean token %d\n", token);
452         } else {
453                 dout("got token %d\n", token);
454         }
455
456         switch (token) {
457         case Opt_read_only:
458                 rbd_opts->read_only = true;
459                 break;
460         case Opt_read_write:
461                 rbd_opts->read_only = false;
462                 break;
463         default:
464                 rbd_assert(false);
465                 break;
466         }
467         return 0;
468 }
469
470 /*
471  * Get a ceph client with specific addr and configuration, if one does
472  * not exist create it.
473  */
474 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
475 {
476         struct rbd_client *rbdc;
477
478         rbdc = rbd_client_find(ceph_opts);
479         if (rbdc)       /* using an existing client */
480                 ceph_destroy_options(ceph_opts);
481         else
482                 rbdc = rbd_client_create(ceph_opts);
483
484         return rbdc;
485 }
486
487 /*
488  * Destroy ceph client
489  *
490  * Caller must hold rbd_client_list_lock.
491  */
492 static void rbd_client_release(struct kref *kref)
493 {
494         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
495
496         dout("rbd_release_client %p\n", rbdc);
497         spin_lock(&rbd_client_list_lock);
498         list_del(&rbdc->node);
499         spin_unlock(&rbd_client_list_lock);
500
501         ceph_destroy_client(rbdc->client);
502         kfree(rbdc);
503 }
504
505 /*
506  * Drop reference to ceph client node. If it's not referenced anymore, release
507  * it.
508  */
509 static void rbd_put_client(struct rbd_client *rbdc)
510 {
511         if (rbdc)
512                 kref_put(&rbdc->kref, rbd_client_release);
513 }
514
515 /*
516  * Destroy requests collection
517  */
518 static void rbd_coll_release(struct kref *kref)
519 {
520         struct rbd_req_coll *coll =
521                 container_of(kref, struct rbd_req_coll, kref);
522
523         dout("rbd_coll_release %p\n", coll);
524         kfree(coll);
525 }
526
527 static bool rbd_image_format_valid(u32 image_format)
528 {
529         return image_format == 1 || image_format == 2;
530 }
531
532 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
533 {
534         size_t size;
535         u32 snap_count;
536
537         /* The header has to start with the magic rbd header text */
538         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
539                 return false;
540
541         /* The bio layer requires at least sector-sized I/O */
542
543         if (ondisk->options.order < SECTOR_SHIFT)
544                 return false;
545
546         /* If we use u64 in a few spots we may be able to loosen this */
547
548         if (ondisk->options.order > 8 * sizeof (int) - 1)
549                 return false;
550
551         /*
552          * The size of a snapshot header has to fit in a size_t, and
553          * that limits the number of snapshots.
554          */
555         snap_count = le32_to_cpu(ondisk->snap_count);
556         size = SIZE_MAX - sizeof (struct ceph_snap_context);
557         if (snap_count > size / sizeof (__le64))
558                 return false;
559
560         /*
561          * Not only that, but the size of the entire the snapshot
562          * header must also be representable in a size_t.
563          */
564         size -= snap_count * sizeof (__le64);
565         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
566                 return false;
567
568         return true;
569 }
570
571 /*
572  * Create a new header structure, translate header format from the on-disk
573  * header.
574  */
575 static int rbd_header_from_disk(struct rbd_image_header *header,
576                                  struct rbd_image_header_ondisk *ondisk)
577 {
578         u32 snap_count;
579         size_t len;
580         size_t size;
581         u32 i;
582
583         memset(header, 0, sizeof (*header));
584
585         snap_count = le32_to_cpu(ondisk->snap_count);
586
587         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
588         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
589         if (!header->object_prefix)
590                 return -ENOMEM;
591         memcpy(header->object_prefix, ondisk->object_prefix, len);
592         header->object_prefix[len] = '\0';
593
594         if (snap_count) {
595                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
596
597                 /* Save a copy of the snapshot names */
598
599                 if (snap_names_len > (u64) SIZE_MAX)
600                         return -EIO;
601                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
602                 if (!header->snap_names)
603                         goto out_err;
604                 /*
605                  * Note that rbd_dev_v1_header_read() guarantees
606                  * the ondisk buffer we're working with has
607                  * snap_names_len bytes beyond the end of the
608                  * snapshot id array, this memcpy() is safe.
609                  */
610                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
611                         snap_names_len);
612
613                 /* Record each snapshot's size */
614
615                 size = snap_count * sizeof (*header->snap_sizes);
616                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
617                 if (!header->snap_sizes)
618                         goto out_err;
619                 for (i = 0; i < snap_count; i++)
620                         header->snap_sizes[i] =
621                                 le64_to_cpu(ondisk->snaps[i].image_size);
622         } else {
623                 WARN_ON(ondisk->snap_names_len);
624                 header->snap_names = NULL;
625                 header->snap_sizes = NULL;
626         }
627
628         header->features = 0;   /* No features support in v1 images */
629         header->obj_order = ondisk->options.order;
630         header->crypt_type = ondisk->options.crypt_type;
631         header->comp_type = ondisk->options.comp_type;
632
633         /* Allocate and fill in the snapshot context */
634
635         header->image_size = le64_to_cpu(ondisk->image_size);
636         size = sizeof (struct ceph_snap_context);
637         size += snap_count * sizeof (header->snapc->snaps[0]);
638         header->snapc = kzalloc(size, GFP_KERNEL);
639         if (!header->snapc)
640                 goto out_err;
641
642         atomic_set(&header->snapc->nref, 1);
643         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
644         header->snapc->num_snaps = snap_count;
645         for (i = 0; i < snap_count; i++)
646                 header->snapc->snaps[i] =
647                         le64_to_cpu(ondisk->snaps[i].id);
648
649         return 0;
650
651 out_err:
652         kfree(header->snap_sizes);
653         header->snap_sizes = NULL;
654         kfree(header->snap_names);
655         header->snap_names = NULL;
656         kfree(header->object_prefix);
657         header->object_prefix = NULL;
658
659         return -ENOMEM;
660 }
661
662 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
663 {
664         struct rbd_snap *snap;
665
666         if (snap_id == CEPH_NOSNAP)
667                 return RBD_SNAP_HEAD_NAME;
668
669         list_for_each_entry(snap, &rbd_dev->snaps, node)
670                 if (snap_id == snap->id)
671                         return snap->name;
672
673         return NULL;
674 }
675
676 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
677 {
678
679         struct rbd_snap *snap;
680
681         list_for_each_entry(snap, &rbd_dev->snaps, node) {
682                 if (!strcmp(snap_name, snap->name)) {
683                         rbd_dev->spec->snap_id = snap->id;
684                         rbd_dev->mapping.size = snap->size;
685                         rbd_dev->mapping.features = snap->features;
686
687                         return 0;
688                 }
689         }
690
691         return -ENOENT;
692 }
693
694 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
695 {
696         int ret;
697
698         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
699                     sizeof (RBD_SNAP_HEAD_NAME))) {
700                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
701                 rbd_dev->mapping.size = rbd_dev->header.image_size;
702                 rbd_dev->mapping.features = rbd_dev->header.features;
703                 ret = 0;
704         } else {
705                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
706                 if (ret < 0)
707                         goto done;
708                 rbd_dev->mapping.read_only = true;
709         }
710         rbd_dev->exists = true;
711 done:
712         return ret;
713 }
714
715 static void rbd_header_free(struct rbd_image_header *header)
716 {
717         kfree(header->object_prefix);
718         header->object_prefix = NULL;
719         kfree(header->snap_sizes);
720         header->snap_sizes = NULL;
721         kfree(header->snap_names);
722         header->snap_names = NULL;
723         ceph_put_snap_context(header->snapc);
724         header->snapc = NULL;
725 }
726
727 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
728 {
729         char *name;
730         u64 segment;
731         int ret;
732
733         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
734         if (!name)
735                 return NULL;
736         segment = offset >> rbd_dev->header.obj_order;
737         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
738                         rbd_dev->header.object_prefix, segment);
739         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
740                 pr_err("error formatting segment name for #%llu (%d)\n",
741                         segment, ret);
742                 kfree(name);
743                 name = NULL;
744         }
745
746         return name;
747 }
748
749 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
750 {
751         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
752
753         return offset & (segment_size - 1);
754 }
755
756 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
757                                 u64 offset, u64 length)
758 {
759         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
760
761         offset &= segment_size - 1;
762
763         rbd_assert(length <= U64_MAX - offset);
764         if (offset + length > segment_size)
765                 length = segment_size - offset;
766
767         return length;
768 }
769
770 static int rbd_get_num_segments(struct rbd_image_header *header,
771                                 u64 ofs, u64 len)
772 {
773         u64 start_seg;
774         u64 end_seg;
775
776         if (!len)
777                 return 0;
778         if (len - 1 > U64_MAX - ofs)
779                 return -ERANGE;
780
781         start_seg = ofs >> header->obj_order;
782         end_seg = (ofs + len - 1) >> header->obj_order;
783
784         return end_seg - start_seg + 1;
785 }
786
787 /*
788  * returns the size of an object in the image
789  */
790 static u64 rbd_obj_bytes(struct rbd_image_header *header)
791 {
792         return 1 << header->obj_order;
793 }
794
795 /*
796  * bio helpers
797  */
798
799 static void bio_chain_put(struct bio *chain)
800 {
801         struct bio *tmp;
802
803         while (chain) {
804                 tmp = chain;
805                 chain = chain->bi_next;
806                 bio_put(tmp);
807         }
808 }
809
810 /*
811  * zeros a bio chain, starting at specific offset
812  */
813 static void zero_bio_chain(struct bio *chain, int start_ofs)
814 {
815         struct bio_vec *bv;
816         unsigned long flags;
817         void *buf;
818         int i;
819         int pos = 0;
820
821         while (chain) {
822                 bio_for_each_segment(bv, chain, i) {
823                         if (pos + bv->bv_len > start_ofs) {
824                                 int remainder = max(start_ofs - pos, 0);
825                                 buf = bvec_kmap_irq(bv, &flags);
826                                 memset(buf + remainder, 0,
827                                        bv->bv_len - remainder);
828                                 bvec_kunmap_irq(buf, &flags);
829                         }
830                         pos += bv->bv_len;
831                 }
832
833                 chain = chain->bi_next;
834         }
835 }
836
837 /*
838  * Clone a portion of a bio, starting at the given byte offset
839  * and continuing for the number of bytes indicated.
840  */
841 static struct bio *bio_clone_range(struct bio *bio_src,
842                                         unsigned int offset,
843                                         unsigned int len,
844                                         gfp_t gfpmask)
845 {
846         struct bio_vec *bv;
847         unsigned int resid;
848         unsigned short idx;
849         unsigned int voff;
850         unsigned short end_idx;
851         unsigned short vcnt;
852         struct bio *bio;
853
854         /* Handle the easy case for the caller */
855
856         if (!offset && len == bio_src->bi_size)
857                 return bio_clone(bio_src, gfpmask);
858
859         if (WARN_ON_ONCE(!len))
860                 return NULL;
861         if (WARN_ON_ONCE(len > bio_src->bi_size))
862                 return NULL;
863         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
864                 return NULL;
865
866         /* Find first affected segment... */
867
868         resid = offset;
869         __bio_for_each_segment(bv, bio_src, idx, 0) {
870                 if (resid < bv->bv_len)
871                         break;
872                 resid -= bv->bv_len;
873         }
874         voff = resid;
875
876         /* ...and the last affected segment */
877
878         resid += len;
879         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
880                 if (resid <= bv->bv_len)
881                         break;
882                 resid -= bv->bv_len;
883         }
884         vcnt = end_idx - idx + 1;
885
886         /* Build the clone */
887
888         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
889         if (!bio)
890                 return NULL;    /* ENOMEM */
891
892         bio->bi_bdev = bio_src->bi_bdev;
893         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
894         bio->bi_rw = bio_src->bi_rw;
895         bio->bi_flags |= 1 << BIO_CLONED;
896
897         /*
898          * Copy over our part of the bio_vec, then update the first
899          * and last (or only) entries.
900          */
901         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
902                         vcnt * sizeof (struct bio_vec));
903         bio->bi_io_vec[0].bv_offset += voff;
904         if (vcnt > 1) {
905                 bio->bi_io_vec[0].bv_len -= voff;
906                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
907         } else {
908                 bio->bi_io_vec[0].bv_len = len;
909         }
910
911         bio->bi_vcnt = vcnt;
912         bio->bi_size = len;
913         bio->bi_idx = 0;
914
915         return bio;
916 }
917
918 /*
919  * Clone a portion of a bio chain, starting at the given byte offset
920  * into the first bio in the source chain and continuing for the
921  * number of bytes indicated.  The result is another bio chain of
922  * exactly the given length, or a null pointer on error.
923  *
924  * The bio_src and offset parameters are both in-out.  On entry they
925  * refer to the first source bio and the offset into that bio where
926  * the start of data to be cloned is located.
927  *
928  * On return, bio_src is updated to refer to the bio in the source
929  * chain that contains first un-cloned byte, and *offset will
930  * contain the offset of that byte within that bio.
931  */
932 static struct bio *bio_chain_clone_range(struct bio **bio_src,
933                                         unsigned int *offset,
934                                         unsigned int len,
935                                         gfp_t gfpmask)
936 {
937         struct bio *bi = *bio_src;
938         unsigned int off = *offset;
939         struct bio *chain = NULL;
940         struct bio **end;
941
942         /* Build up a chain of clone bios up to the limit */
943
944         if (!bi || off >= bi->bi_size || !len)
945                 return NULL;            /* Nothing to clone */
946
947         end = &chain;
948         while (len) {
949                 unsigned int bi_size;
950                 struct bio *bio;
951
952                 if (!bi)
953                         goto out_err;   /* EINVAL; ran out of bio's */
954                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
955                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
956                 if (!bio)
957                         goto out_err;   /* ENOMEM */
958
959                 *end = bio;
960                 end = &bio->bi_next;
961
962                 off += bi_size;
963                 if (off == bi->bi_size) {
964                         bi = bi->bi_next;
965                         off = 0;
966                 }
967                 len -= bi_size;
968         }
969         *bio_src = bi;
970         *offset = off;
971
972         return chain;
973 out_err:
974         bio_chain_put(chain);
975
976         return NULL;
977 }
978
979 /*
980  * helpers for osd request op vectors.
981  */
982 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
983                                         int opcode, u32 payload_len)
984 {
985         struct ceph_osd_req_op *ops;
986
987         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
988         if (!ops)
989                 return NULL;
990
991         ops[0].op = opcode;
992
993         /*
994          * op extent offset and length will be set later on
995          * in calc_raw_layout()
996          */
997         ops[0].payload_len = payload_len;
998
999         return ops;
1000 }
1001
1002 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
1003 {
1004         kfree(ops);
1005 }
1006
1007 static void rbd_coll_end_req_index(struct request *rq,
1008                                    struct rbd_req_coll *coll,
1009                                    int index,
1010                                    int ret, u64 len)
1011 {
1012         struct request_queue *q;
1013         int min, max, i;
1014
1015         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
1016              coll, index, ret, (unsigned long long) len);
1017
1018         if (!rq)
1019                 return;
1020
1021         if (!coll) {
1022                 blk_end_request(rq, ret, len);
1023                 return;
1024         }
1025
1026         q = rq->q;
1027
1028         spin_lock_irq(q->queue_lock);
1029         coll->status[index].done = 1;
1030         coll->status[index].rc = ret;
1031         coll->status[index].bytes = len;
1032         max = min = coll->num_done;
1033         while (max < coll->total && coll->status[max].done)
1034                 max++;
1035
1036         for (i = min; i<max; i++) {
1037                 __blk_end_request(rq, coll->status[i].rc,
1038                                   coll->status[i].bytes);
1039                 coll->num_done++;
1040                 kref_put(&coll->kref, rbd_coll_release);
1041         }
1042         spin_unlock_irq(q->queue_lock);
1043 }
1044
1045 static void rbd_coll_end_req(struct rbd_request *req,
1046                              int ret, u64 len)
1047 {
1048         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
1049 }
1050
1051 /*
1052  * Send ceph osd request
1053  */
1054 static int rbd_do_request(struct request *rq,
1055                           struct rbd_device *rbd_dev,
1056                           struct ceph_snap_context *snapc,
1057                           u64 snapid,
1058                           const char *object_name, u64 ofs, u64 len,
1059                           struct bio *bio,
1060                           struct page **pages,
1061                           int num_pages,
1062                           int flags,
1063                           struct ceph_osd_req_op *ops,
1064                           struct rbd_req_coll *coll,
1065                           int coll_index,
1066                           void (*rbd_cb)(struct ceph_osd_request *req,
1067                                          struct ceph_msg *msg),
1068                           struct ceph_osd_request **linger_req,
1069                           u64 *ver)
1070 {
1071         struct ceph_osd_request *req;
1072         struct ceph_file_layout *layout;
1073         int ret;
1074         u64 bno;
1075         struct timespec mtime = CURRENT_TIME;
1076         struct rbd_request *req_data;
1077         struct ceph_osd_request_head *reqhead;
1078         struct ceph_osd_client *osdc;
1079
1080         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1081         if (!req_data) {
1082                 if (coll)
1083                         rbd_coll_end_req_index(rq, coll, coll_index,
1084                                                -ENOMEM, len);
1085                 return -ENOMEM;
1086         }
1087
1088         if (coll) {
1089                 req_data->coll = coll;
1090                 req_data->coll_index = coll_index;
1091         }
1092
1093         dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1094                 object_name, (unsigned long long) ofs,
1095                 (unsigned long long) len, coll, coll_index);
1096
1097         osdc = &rbd_dev->rbd_client->client->osdc;
1098         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
1099                                         false, GFP_NOIO, pages, bio);
1100         if (!req) {
1101                 ret = -ENOMEM;
1102                 goto done_pages;
1103         }
1104
1105         req->r_callback = rbd_cb;
1106
1107         req_data->rq = rq;
1108         req_data->bio = bio;
1109         req_data->pages = pages;
1110         req_data->len = len;
1111
1112         req->r_priv = req_data;
1113
1114         reqhead = req->r_request->front.iov_base;
1115         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1116
1117         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1118         req->r_oid_len = strlen(req->r_oid);
1119
1120         layout = &req->r_file_layout;
1121         memset(layout, 0, sizeof(*layout));
1122         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1123         layout->fl_stripe_count = cpu_to_le32(1);
1124         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1125         layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id);
1126         ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1127                                    req, ops);
1128         rbd_assert(ret == 0);
1129
1130         ceph_osdc_build_request(req, ofs, &len,
1131                                 ops,
1132                                 snapc,
1133                                 &mtime,
1134                                 req->r_oid, req->r_oid_len);
1135
1136         if (linger_req) {
1137                 ceph_osdc_set_request_linger(osdc, req);
1138                 *linger_req = req;
1139         }
1140
1141         ret = ceph_osdc_start_request(osdc, req, false);
1142         if (ret < 0)
1143                 goto done_err;
1144
1145         if (!rbd_cb) {
1146                 ret = ceph_osdc_wait_request(osdc, req);
1147                 if (ver)
1148                         *ver = le64_to_cpu(req->r_reassert_version.version);
1149                 dout("reassert_ver=%llu\n",
1150                         (unsigned long long)
1151                                 le64_to_cpu(req->r_reassert_version.version));
1152                 ceph_osdc_put_request(req);
1153         }
1154         return ret;
1155
1156 done_err:
1157         bio_chain_put(req_data->bio);
1158         ceph_osdc_put_request(req);
1159 done_pages:
1160         rbd_coll_end_req(req_data, ret, len);
1161         kfree(req_data);
1162         return ret;
1163 }
1164
1165 /*
1166  * Ceph osd op callback
1167  */
1168 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1169 {
1170         struct rbd_request *req_data = req->r_priv;
1171         struct ceph_osd_reply_head *replyhead;
1172         struct ceph_osd_op *op;
1173         __s32 rc;
1174         u64 bytes;
1175         int read_op;
1176
1177         /* parse reply */
1178         replyhead = msg->front.iov_base;
1179         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1180         op = (void *)(replyhead + 1);
1181         rc = le32_to_cpu(replyhead->result);
1182         bytes = le64_to_cpu(op->extent.length);
1183         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1184
1185         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1186                 (unsigned long long) bytes, read_op, (int) rc);
1187
1188         if (rc == -ENOENT && read_op) {
1189                 zero_bio_chain(req_data->bio, 0);
1190                 rc = 0;
1191         } else if (rc == 0 && read_op && bytes < req_data->len) {
1192                 zero_bio_chain(req_data->bio, bytes);
1193                 bytes = req_data->len;
1194         }
1195
1196         rbd_coll_end_req(req_data, rc, bytes);
1197
1198         if (req_data->bio)
1199                 bio_chain_put(req_data->bio);
1200
1201         ceph_osdc_put_request(req);
1202         kfree(req_data);
1203 }
1204
1205 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1206 {
1207         ceph_osdc_put_request(req);
1208 }
1209
1210 /*
1211  * Do a synchronous ceph osd operation
1212  */
1213 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1214                            struct ceph_snap_context *snapc,
1215                            u64 snapid,
1216                            int flags,
1217                            struct ceph_osd_req_op *ops,
1218                            const char *object_name,
1219                            u64 ofs, u64 inbound_size,
1220                            char *inbound,
1221                            struct ceph_osd_request **linger_req,
1222                            u64 *ver)
1223 {
1224         int ret;
1225         struct page **pages;
1226         int num_pages;
1227
1228         rbd_assert(ops != NULL);
1229
1230         num_pages = calc_pages_for(ofs, inbound_size);
1231         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1232         if (IS_ERR(pages))
1233                 return PTR_ERR(pages);
1234
1235         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1236                           object_name, ofs, inbound_size, NULL,
1237                           pages, num_pages,
1238                           flags,
1239                           ops,
1240                           NULL, 0,
1241                           NULL,
1242                           linger_req, ver);
1243         if (ret < 0)
1244                 goto done;
1245
1246         if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1247                 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1248
1249 done:
1250         ceph_release_page_vector(pages, num_pages);
1251         return ret;
1252 }
1253
1254 /*
1255  * Do an asynchronous ceph osd operation
1256  */
1257 static int rbd_do_op(struct request *rq,
1258                      struct rbd_device *rbd_dev,
1259                      struct ceph_snap_context *snapc,
1260                      u64 ofs, u64 len,
1261                      struct bio *bio,
1262                      struct rbd_req_coll *coll,
1263                      int coll_index)
1264 {
1265         char *seg_name;
1266         u64 seg_ofs;
1267         u64 seg_len;
1268         int ret;
1269         struct ceph_osd_req_op *ops;
1270         u32 payload_len;
1271         int opcode;
1272         int flags;
1273         u64 snapid;
1274
1275         seg_name = rbd_segment_name(rbd_dev, ofs);
1276         if (!seg_name)
1277                 return -ENOMEM;
1278         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1279         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1280
1281         if (rq_data_dir(rq) == WRITE) {
1282                 opcode = CEPH_OSD_OP_WRITE;
1283                 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1284                 snapid = CEPH_NOSNAP;
1285                 payload_len = seg_len;
1286         } else {
1287                 opcode = CEPH_OSD_OP_READ;
1288                 flags = CEPH_OSD_FLAG_READ;
1289                 snapc = NULL;
1290                 snapid = rbd_dev->spec->snap_id;
1291                 payload_len = 0;
1292         }
1293
1294         ret = -ENOMEM;
1295         ops = rbd_create_rw_ops(1, opcode, payload_len);
1296         if (!ops)
1297                 goto done;
1298
1299         /* we've taken care of segment sizes earlier when we
1300            cloned the bios. We should never have a segment
1301            truncated at this point */
1302         rbd_assert(seg_len == len);
1303
1304         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1305                              seg_name, seg_ofs, seg_len,
1306                              bio,
1307                              NULL, 0,
1308                              flags,
1309                              ops,
1310                              coll, coll_index,
1311                              rbd_req_cb, 0, NULL);
1312
1313         rbd_destroy_ops(ops);
1314 done:
1315         kfree(seg_name);
1316         return ret;
1317 }
1318
1319 /*
1320  * Request sync osd read
1321  */
1322 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1323                           u64 snapid,
1324                           const char *object_name,
1325                           u64 ofs, u64 len,
1326                           char *buf,
1327                           u64 *ver)
1328 {
1329         struct ceph_osd_req_op *ops;
1330         int ret;
1331
1332         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1333         if (!ops)
1334                 return -ENOMEM;
1335
1336         ret = rbd_req_sync_op(rbd_dev, NULL,
1337                                snapid,
1338                                CEPH_OSD_FLAG_READ,
1339                                ops, object_name, ofs, len, buf, NULL, ver);
1340         rbd_destroy_ops(ops);
1341
1342         return ret;
1343 }
1344
1345 /*
1346  * Request sync osd watch
1347  */
1348 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1349                                    u64 ver,
1350                                    u64 notify_id)
1351 {
1352         struct ceph_osd_req_op *ops;
1353         int ret;
1354
1355         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1356         if (!ops)
1357                 return -ENOMEM;
1358
1359         ops[0].watch.ver = cpu_to_le64(ver);
1360         ops[0].watch.cookie = notify_id;
1361         ops[0].watch.flag = 0;
1362
1363         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1364                           rbd_dev->header_name, 0, 0, NULL,
1365                           NULL, 0,
1366                           CEPH_OSD_FLAG_READ,
1367                           ops,
1368                           NULL, 0,
1369                           rbd_simple_req_cb, 0, NULL);
1370
1371         rbd_destroy_ops(ops);
1372         return ret;
1373 }
1374
1375 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1376 {
1377         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1378         u64 hver;
1379         int rc;
1380
1381         if (!rbd_dev)
1382                 return;
1383
1384         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1385                 rbd_dev->header_name, (unsigned long long) notify_id,
1386                 (unsigned int) opcode);
1387         rc = rbd_dev_refresh(rbd_dev, &hver);
1388         if (rc)
1389                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1390                            " update snaps: %d\n", rbd_dev->major, rc);
1391
1392         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1393 }
1394
1395 /*
1396  * Request sync osd watch
1397  */
1398 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1399 {
1400         struct ceph_osd_req_op *ops;
1401         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1402         int ret;
1403
1404         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1405         if (!ops)
1406                 return -ENOMEM;
1407
1408         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1409                                      (void *)rbd_dev, &rbd_dev->watch_event);
1410         if (ret < 0)
1411                 goto fail;
1412
1413         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1414         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1415         ops[0].watch.flag = 1;
1416
1417         ret = rbd_req_sync_op(rbd_dev, NULL,
1418                               CEPH_NOSNAP,
1419                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1420                               ops,
1421                               rbd_dev->header_name,
1422                               0, 0, NULL,
1423                               &rbd_dev->watch_request, NULL);
1424
1425         if (ret < 0)
1426                 goto fail_event;
1427
1428         rbd_destroy_ops(ops);
1429         return 0;
1430
1431 fail_event:
1432         ceph_osdc_cancel_event(rbd_dev->watch_event);
1433         rbd_dev->watch_event = NULL;
1434 fail:
1435         rbd_destroy_ops(ops);
1436         return ret;
1437 }
1438
1439 /*
1440  * Request sync osd unwatch
1441  */
1442 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1443 {
1444         struct ceph_osd_req_op *ops;
1445         int ret;
1446
1447         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1448         if (!ops)
1449                 return -ENOMEM;
1450
1451         ops[0].watch.ver = 0;
1452         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1453         ops[0].watch.flag = 0;
1454
1455         ret = rbd_req_sync_op(rbd_dev, NULL,
1456                               CEPH_NOSNAP,
1457                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1458                               ops,
1459                               rbd_dev->header_name,
1460                               0, 0, NULL, NULL, NULL);
1461
1462
1463         rbd_destroy_ops(ops);
1464         ceph_osdc_cancel_event(rbd_dev->watch_event);
1465         rbd_dev->watch_event = NULL;
1466         return ret;
1467 }
1468
1469 /*
1470  * Synchronous osd object method call
1471  */
1472 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1473                              const char *object_name,
1474                              const char *class_name,
1475                              const char *method_name,
1476                              const char *outbound,
1477                              size_t outbound_size,
1478                              char *inbound,
1479                              size_t inbound_size,
1480                              int flags,
1481                              u64 *ver)
1482 {
1483         struct ceph_osd_req_op *ops;
1484         int class_name_len = strlen(class_name);
1485         int method_name_len = strlen(method_name);
1486         int payload_size;
1487         int ret;
1488
1489         /*
1490          * Any input parameters required by the method we're calling
1491          * will be sent along with the class and method names as
1492          * part of the message payload.  That data and its size are
1493          * supplied via the indata and indata_len fields (named from
1494          * the perspective of the server side) in the OSD request
1495          * operation.
1496          */
1497         payload_size = class_name_len + method_name_len + outbound_size;
1498         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1499         if (!ops)
1500                 return -ENOMEM;
1501
1502         ops[0].cls.class_name = class_name;
1503         ops[0].cls.class_len = (__u8) class_name_len;
1504         ops[0].cls.method_name = method_name;
1505         ops[0].cls.method_len = (__u8) method_name_len;
1506         ops[0].cls.argc = 0;
1507         ops[0].cls.indata = outbound;
1508         ops[0].cls.indata_len = outbound_size;
1509
1510         ret = rbd_req_sync_op(rbd_dev, NULL,
1511                                CEPH_NOSNAP,
1512                                flags, ops,
1513                                object_name, 0, inbound_size, inbound,
1514                                NULL, ver);
1515
1516         rbd_destroy_ops(ops);
1517
1518         dout("cls_exec returned %d\n", ret);
1519         return ret;
1520 }
1521
1522 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1523 {
1524         struct rbd_req_coll *coll =
1525                         kzalloc(sizeof(struct rbd_req_coll) +
1526                                 sizeof(struct rbd_req_status) * num_reqs,
1527                                 GFP_ATOMIC);
1528
1529         if (!coll)
1530                 return NULL;
1531         coll->total = num_reqs;
1532         kref_init(&coll->kref);
1533         return coll;
1534 }
1535
1536 /*
1537  * block device queue callback
1538  */
1539 static void rbd_rq_fn(struct request_queue *q)
1540 {
1541         struct rbd_device *rbd_dev = q->queuedata;
1542         struct request *rq;
1543
1544         while ((rq = blk_fetch_request(q))) {
1545                 struct bio *bio;
1546                 bool do_write;
1547                 unsigned int size;
1548                 u64 ofs;
1549                 int num_segs, cur_seg = 0;
1550                 struct rbd_req_coll *coll;
1551                 struct ceph_snap_context *snapc;
1552                 unsigned int bio_offset;
1553
1554                 dout("fetched request\n");
1555
1556                 /* filter out block requests we don't understand */
1557                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1558                         __blk_end_request_all(rq, 0);
1559                         continue;
1560                 }
1561
1562                 /* deduce our operation (read, write) */
1563                 do_write = (rq_data_dir(rq) == WRITE);
1564                 if (do_write && rbd_dev->mapping.read_only) {
1565                         __blk_end_request_all(rq, -EROFS);
1566                         continue;
1567                 }
1568
1569                 spin_unlock_irq(q->queue_lock);
1570
1571                 down_read(&rbd_dev->header_rwsem);
1572
1573                 if (!rbd_dev->exists) {
1574                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1575                         up_read(&rbd_dev->header_rwsem);
1576                         dout("request for non-existent snapshot");
1577                         spin_lock_irq(q->queue_lock);
1578                         __blk_end_request_all(rq, -ENXIO);
1579                         continue;
1580                 }
1581
1582                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1583
1584                 up_read(&rbd_dev->header_rwsem);
1585
1586                 size = blk_rq_bytes(rq);
1587                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1588                 bio = rq->bio;
1589
1590                 dout("%s 0x%x bytes at 0x%llx\n",
1591                      do_write ? "write" : "read",
1592                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1593
1594                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1595                 if (num_segs <= 0) {
1596                         spin_lock_irq(q->queue_lock);
1597                         __blk_end_request_all(rq, num_segs);
1598                         ceph_put_snap_context(snapc);
1599                         continue;
1600                 }
1601                 coll = rbd_alloc_coll(num_segs);
1602                 if (!coll) {
1603                         spin_lock_irq(q->queue_lock);
1604                         __blk_end_request_all(rq, -ENOMEM);
1605                         ceph_put_snap_context(snapc);
1606                         continue;
1607                 }
1608
1609                 bio_offset = 0;
1610                 do {
1611                         u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1612                         unsigned int chain_size;
1613                         struct bio *bio_chain;
1614
1615                         BUG_ON(limit > (u64) UINT_MAX);
1616                         chain_size = (unsigned int) limit;
1617                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1618
1619                         kref_get(&coll->kref);
1620
1621                         /* Pass a cloned bio chain via an osd request */
1622
1623                         bio_chain = bio_chain_clone_range(&bio,
1624                                                 &bio_offset, chain_size,
1625                                                 GFP_ATOMIC);
1626                         if (bio_chain)
1627                                 (void) rbd_do_op(rq, rbd_dev, snapc,
1628                                                 ofs, chain_size,
1629                                                 bio_chain, coll, cur_seg);
1630                         else
1631                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1632                                                        -ENOMEM, chain_size);
1633                         size -= chain_size;
1634                         ofs += chain_size;
1635
1636                         cur_seg++;
1637                 } while (size > 0);
1638                 kref_put(&coll->kref, rbd_coll_release);
1639
1640                 spin_lock_irq(q->queue_lock);
1641
1642                 ceph_put_snap_context(snapc);
1643         }
1644 }
1645
1646 /*
1647  * a queue callback. Makes sure that we don't create a bio that spans across
1648  * multiple osd objects. One exception would be with a single page bios,
1649  * which we handle later at bio_chain_clone_range()
1650  */
1651 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1652                           struct bio_vec *bvec)
1653 {
1654         struct rbd_device *rbd_dev = q->queuedata;
1655         sector_t sector_offset;
1656         sector_t sectors_per_obj;
1657         sector_t obj_sector_offset;
1658         int ret;
1659
1660         /*
1661          * Find how far into its rbd object the partition-relative
1662          * bio start sector is to offset relative to the enclosing
1663          * device.
1664          */
1665         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1666         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1667         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1668
1669         /*
1670          * Compute the number of bytes from that offset to the end
1671          * of the object.  Account for what's already used by the bio.
1672          */
1673         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1674         if (ret > bmd->bi_size)
1675                 ret -= bmd->bi_size;
1676         else
1677                 ret = 0;
1678
1679         /*
1680          * Don't send back more than was asked for.  And if the bio
1681          * was empty, let the whole thing through because:  "Note
1682          * that a block device *must* allow a single page to be
1683          * added to an empty bio."
1684          */
1685         rbd_assert(bvec->bv_len <= PAGE_SIZE);
1686         if (ret > (int) bvec->bv_len || !bmd->bi_size)
1687                 ret = (int) bvec->bv_len;
1688
1689         return ret;
1690 }
1691
1692 static void rbd_free_disk(struct rbd_device *rbd_dev)
1693 {
1694         struct gendisk *disk = rbd_dev->disk;
1695
1696         if (!disk)
1697                 return;
1698
1699         if (disk->flags & GENHD_FL_UP)
1700                 del_gendisk(disk);
1701         if (disk->queue)
1702                 blk_cleanup_queue(disk->queue);
1703         put_disk(disk);
1704 }
1705
1706 /*
1707  * Read the complete header for the given rbd device.
1708  *
1709  * Returns a pointer to a dynamically-allocated buffer containing
1710  * the complete and validated header.  Caller can pass the address
1711  * of a variable that will be filled in with the version of the
1712  * header object at the time it was read.
1713  *
1714  * Returns a pointer-coded errno if a failure occurs.
1715  */
1716 static struct rbd_image_header_ondisk *
1717 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1718 {
1719         struct rbd_image_header_ondisk *ondisk = NULL;
1720         u32 snap_count = 0;
1721         u64 names_size = 0;
1722         u32 want_count;
1723         int ret;
1724
1725         /*
1726          * The complete header will include an array of its 64-bit
1727          * snapshot ids, followed by the names of those snapshots as
1728          * a contiguous block of NUL-terminated strings.  Note that
1729          * the number of snapshots could change by the time we read
1730          * it in, in which case we re-read it.
1731          */
1732         do {
1733                 size_t size;
1734
1735                 kfree(ondisk);
1736
1737                 size = sizeof (*ondisk);
1738                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1739                 size += names_size;
1740                 ondisk = kmalloc(size, GFP_KERNEL);
1741                 if (!ondisk)
1742                         return ERR_PTR(-ENOMEM);
1743
1744                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1745                                        rbd_dev->header_name,
1746                                        0, size,
1747                                        (char *) ondisk, version);
1748
1749                 if (ret < 0)
1750                         goto out_err;
1751                 if (WARN_ON((size_t) ret < size)) {
1752                         ret = -ENXIO;
1753                         pr_warning("short header read for image %s"
1754                                         " (want %zd got %d)\n",
1755                                 rbd_dev->spec->image_name, size, ret);
1756                         goto out_err;
1757                 }
1758                 if (!rbd_dev_ondisk_valid(ondisk)) {
1759                         ret = -ENXIO;
1760                         pr_warning("invalid header for image %s\n",
1761                                 rbd_dev->spec->image_name);
1762                         goto out_err;
1763                 }
1764
1765                 names_size = le64_to_cpu(ondisk->snap_names_len);
1766                 want_count = snap_count;
1767                 snap_count = le32_to_cpu(ondisk->snap_count);
1768         } while (snap_count != want_count);
1769
1770         return ondisk;
1771
1772 out_err:
1773         kfree(ondisk);
1774
1775         return ERR_PTR(ret);
1776 }
1777
1778 /*
1779  * reload the ondisk the header
1780  */
1781 static int rbd_read_header(struct rbd_device *rbd_dev,
1782                            struct rbd_image_header *header)
1783 {
1784         struct rbd_image_header_ondisk *ondisk;
1785         u64 ver = 0;
1786         int ret;
1787
1788         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1789         if (IS_ERR(ondisk))
1790                 return PTR_ERR(ondisk);
1791         ret = rbd_header_from_disk(header, ondisk);
1792         if (ret >= 0)
1793                 header->obj_version = ver;
1794         kfree(ondisk);
1795
1796         return ret;
1797 }
1798
1799 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1800 {
1801         struct rbd_snap *snap;
1802         struct rbd_snap *next;
1803
1804         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1805                 rbd_remove_snap_dev(snap);
1806 }
1807
1808 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1809 {
1810         sector_t size;
1811
1812         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
1813                 return;
1814
1815         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1816         dout("setting size to %llu sectors", (unsigned long long) size);
1817         rbd_dev->mapping.size = (u64) size;
1818         set_capacity(rbd_dev->disk, size);
1819 }
1820
1821 /*
1822  * only read the first part of the ondisk header, without the snaps info
1823  */
1824 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1825 {
1826         int ret;
1827         struct rbd_image_header h;
1828
1829         ret = rbd_read_header(rbd_dev, &h);
1830         if (ret < 0)
1831                 return ret;
1832
1833         down_write(&rbd_dev->header_rwsem);
1834
1835         /* Update image size, and check for resize of mapped image */
1836         rbd_dev->header.image_size = h.image_size;
1837         rbd_update_mapping_size(rbd_dev);
1838
1839         /* rbd_dev->header.object_prefix shouldn't change */
1840         kfree(rbd_dev->header.snap_sizes);
1841         kfree(rbd_dev->header.snap_names);
1842         /* osd requests may still refer to snapc */
1843         ceph_put_snap_context(rbd_dev->header.snapc);
1844
1845         if (hver)
1846                 *hver = h.obj_version;
1847         rbd_dev->header.obj_version = h.obj_version;
1848         rbd_dev->header.image_size = h.image_size;
1849         rbd_dev->header.snapc = h.snapc;
1850         rbd_dev->header.snap_names = h.snap_names;
1851         rbd_dev->header.snap_sizes = h.snap_sizes;
1852         /* Free the extra copy of the object prefix */
1853         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1854         kfree(h.object_prefix);
1855
1856         ret = rbd_dev_snaps_update(rbd_dev);
1857         if (!ret)
1858                 ret = rbd_dev_snaps_register(rbd_dev);
1859
1860         up_write(&rbd_dev->header_rwsem);
1861
1862         return ret;
1863 }
1864
1865 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1866 {
1867         int ret;
1868
1869         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1870         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1871         if (rbd_dev->image_format == 1)
1872                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1873         else
1874                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1875         mutex_unlock(&ctl_mutex);
1876
1877         return ret;
1878 }
1879
1880 static int rbd_init_disk(struct rbd_device *rbd_dev)
1881 {
1882         struct gendisk *disk;
1883         struct request_queue *q;
1884         u64 segment_size;
1885
1886         /* create gendisk info */
1887         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1888         if (!disk)
1889                 return -ENOMEM;
1890
1891         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1892                  rbd_dev->dev_id);
1893         disk->major = rbd_dev->major;
1894         disk->first_minor = 0;
1895         disk->fops = &rbd_bd_ops;
1896         disk->private_data = rbd_dev;
1897
1898         /* init rq */
1899         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1900         if (!q)
1901                 goto out_disk;
1902
1903         /* We use the default size, but let's be explicit about it. */
1904         blk_queue_physical_block_size(q, SECTOR_SIZE);
1905
1906         /* set io sizes to object size */
1907         segment_size = rbd_obj_bytes(&rbd_dev->header);
1908         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1909         blk_queue_max_segment_size(q, segment_size);
1910         blk_queue_io_min(q, segment_size);
1911         blk_queue_io_opt(q, segment_size);
1912
1913         blk_queue_merge_bvec(q, rbd_merge_bvec);
1914         disk->queue = q;
1915
1916         q->queuedata = rbd_dev;
1917
1918         rbd_dev->disk = disk;
1919
1920         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1921
1922         return 0;
1923 out_disk:
1924         put_disk(disk);
1925
1926         return -ENOMEM;
1927 }
1928
1929 /*
1930   sysfs
1931 */
1932
1933 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1934 {
1935         return container_of(dev, struct rbd_device, dev);
1936 }
1937
1938 static ssize_t rbd_size_show(struct device *dev,
1939                              struct device_attribute *attr, char *buf)
1940 {
1941         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1942         sector_t size;
1943
1944         down_read(&rbd_dev->header_rwsem);
1945         size = get_capacity(rbd_dev->disk);
1946         up_read(&rbd_dev->header_rwsem);
1947
1948         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1949 }
1950
1951 /*
1952  * Note this shows the features for whatever's mapped, which is not
1953  * necessarily the base image.
1954  */
1955 static ssize_t rbd_features_show(struct device *dev,
1956                              struct device_attribute *attr, char *buf)
1957 {
1958         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1959
1960         return sprintf(buf, "0x%016llx\n",
1961                         (unsigned long long) rbd_dev->mapping.features);
1962 }
1963
1964 static ssize_t rbd_major_show(struct device *dev,
1965                               struct device_attribute *attr, char *buf)
1966 {
1967         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1968
1969         return sprintf(buf, "%d\n", rbd_dev->major);
1970 }
1971
1972 static ssize_t rbd_client_id_show(struct device *dev,
1973                                   struct device_attribute *attr, char *buf)
1974 {
1975         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1976
1977         return sprintf(buf, "client%lld\n",
1978                         ceph_client_id(rbd_dev->rbd_client->client));
1979 }
1980
1981 static ssize_t rbd_pool_show(struct device *dev,
1982                              struct device_attribute *attr, char *buf)
1983 {
1984         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1985
1986         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
1987 }
1988
1989 static ssize_t rbd_pool_id_show(struct device *dev,
1990                              struct device_attribute *attr, char *buf)
1991 {
1992         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1993
1994         return sprintf(buf, "%llu\n",
1995                 (unsigned long long) rbd_dev->spec->pool_id);
1996 }
1997
1998 static ssize_t rbd_name_show(struct device *dev,
1999                              struct device_attribute *attr, char *buf)
2000 {
2001         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2002
2003         if (rbd_dev->spec->image_name)
2004                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2005
2006         return sprintf(buf, "(unknown)\n");
2007 }
2008
2009 static ssize_t rbd_image_id_show(struct device *dev,
2010                              struct device_attribute *attr, char *buf)
2011 {
2012         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2013
2014         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2015 }
2016
2017 /*
2018  * Shows the name of the currently-mapped snapshot (or
2019  * RBD_SNAP_HEAD_NAME for the base image).
2020  */
2021 static ssize_t rbd_snap_show(struct device *dev,
2022                              struct device_attribute *attr,
2023                              char *buf)
2024 {
2025         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2026
2027         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2028 }
2029
2030 /*
2031  * For an rbd v2 image, shows the pool id, image id, and snapshot id
2032  * for the parent image.  If there is no parent, simply shows
2033  * "(no parent image)".
2034  */
2035 static ssize_t rbd_parent_show(struct device *dev,
2036                              struct device_attribute *attr,
2037                              char *buf)
2038 {
2039         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2040         struct rbd_spec *spec = rbd_dev->parent_spec;
2041         int count;
2042         char *bufp = buf;
2043
2044         if (!spec)
2045                 return sprintf(buf, "(no parent image)\n");
2046
2047         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2048                         (unsigned long long) spec->pool_id, spec->pool_name);
2049         if (count < 0)
2050                 return count;
2051         bufp += count;
2052
2053         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2054                         spec->image_name ? spec->image_name : "(unknown)");
2055         if (count < 0)
2056                 return count;
2057         bufp += count;
2058
2059         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2060                         (unsigned long long) spec->snap_id, spec->snap_name);
2061         if (count < 0)
2062                 return count;
2063         bufp += count;
2064
2065         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2066         if (count < 0)
2067                 return count;
2068         bufp += count;
2069
2070         return (ssize_t) (bufp - buf);
2071 }
2072
2073 static ssize_t rbd_image_refresh(struct device *dev,
2074                                  struct device_attribute *attr,
2075                                  const char *buf,
2076                                  size_t size)
2077 {
2078         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2079         int ret;
2080
2081         ret = rbd_dev_refresh(rbd_dev, NULL);
2082
2083         return ret < 0 ? ret : size;
2084 }
2085
2086 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2087 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2088 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2089 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2090 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2091 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2092 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2093 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2094 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2095 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2096 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2097
2098 static struct attribute *rbd_attrs[] = {
2099         &dev_attr_size.attr,
2100         &dev_attr_features.attr,
2101         &dev_attr_major.attr,
2102         &dev_attr_client_id.attr,
2103         &dev_attr_pool.attr,
2104         &dev_attr_pool_id.attr,
2105         &dev_attr_name.attr,
2106         &dev_attr_image_id.attr,
2107         &dev_attr_current_snap.attr,
2108         &dev_attr_parent.attr,
2109         &dev_attr_refresh.attr,
2110         NULL
2111 };
2112
2113 static struct attribute_group rbd_attr_group = {
2114         .attrs = rbd_attrs,
2115 };
2116
2117 static const struct attribute_group *rbd_attr_groups[] = {
2118         &rbd_attr_group,
2119         NULL
2120 };
2121
2122 static void rbd_sysfs_dev_release(struct device *dev)
2123 {
2124 }
2125
2126 static struct device_type rbd_device_type = {
2127         .name           = "rbd",
2128         .groups         = rbd_attr_groups,
2129         .release        = rbd_sysfs_dev_release,
2130 };
2131
2132
2133 /*
2134   sysfs - snapshots
2135 */
2136
2137 static ssize_t rbd_snap_size_show(struct device *dev,
2138                                   struct device_attribute *attr,
2139                                   char *buf)
2140 {
2141         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2142
2143         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2144 }
2145
2146 static ssize_t rbd_snap_id_show(struct device *dev,
2147                                 struct device_attribute *attr,
2148                                 char *buf)
2149 {
2150         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2151
2152         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2153 }
2154
2155 static ssize_t rbd_snap_features_show(struct device *dev,
2156                                 struct device_attribute *attr,
2157                                 char *buf)
2158 {
2159         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2160
2161         return sprintf(buf, "0x%016llx\n",
2162                         (unsigned long long) snap->features);
2163 }
2164
2165 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2166 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2167 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2168
2169 static struct attribute *rbd_snap_attrs[] = {
2170         &dev_attr_snap_size.attr,
2171         &dev_attr_snap_id.attr,
2172         &dev_attr_snap_features.attr,
2173         NULL,
2174 };
2175
2176 static struct attribute_group rbd_snap_attr_group = {
2177         .attrs = rbd_snap_attrs,
2178 };
2179
2180 static void rbd_snap_dev_release(struct device *dev)
2181 {
2182         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2183         kfree(snap->name);
2184         kfree(snap);
2185 }
2186
2187 static const struct attribute_group *rbd_snap_attr_groups[] = {
2188         &rbd_snap_attr_group,
2189         NULL
2190 };
2191
2192 static struct device_type rbd_snap_device_type = {
2193         .groups         = rbd_snap_attr_groups,
2194         .release        = rbd_snap_dev_release,
2195 };
2196
2197 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2198 {
2199         kref_get(&spec->kref);
2200
2201         return spec;
2202 }
2203
2204 static void rbd_spec_free(struct kref *kref);
2205 static void rbd_spec_put(struct rbd_spec *spec)
2206 {
2207         if (spec)
2208                 kref_put(&spec->kref, rbd_spec_free);
2209 }
2210
2211 static struct rbd_spec *rbd_spec_alloc(void)
2212 {
2213         struct rbd_spec *spec;
2214
2215         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2216         if (!spec)
2217                 return NULL;
2218         kref_init(&spec->kref);
2219
2220         rbd_spec_put(rbd_spec_get(spec));       /* TEMPORARY */
2221
2222         return spec;
2223 }
2224
2225 static void rbd_spec_free(struct kref *kref)
2226 {
2227         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2228
2229         kfree(spec->pool_name);
2230         kfree(spec->image_id);
2231         kfree(spec->image_name);
2232         kfree(spec->snap_name);
2233         kfree(spec);
2234 }
2235
2236 struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2237                                 struct rbd_spec *spec)
2238 {
2239         struct rbd_device *rbd_dev;
2240
2241         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2242         if (!rbd_dev)
2243                 return NULL;
2244
2245         spin_lock_init(&rbd_dev->lock);
2246         INIT_LIST_HEAD(&rbd_dev->node);
2247         INIT_LIST_HEAD(&rbd_dev->snaps);
2248         init_rwsem(&rbd_dev->header_rwsem);
2249
2250         rbd_dev->spec = spec;
2251         rbd_dev->rbd_client = rbdc;
2252
2253         return rbd_dev;
2254 }
2255
2256 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2257 {
2258         rbd_spec_put(rbd_dev->parent_spec);
2259         kfree(rbd_dev->header_name);
2260         rbd_put_client(rbd_dev->rbd_client);
2261         rbd_spec_put(rbd_dev->spec);
2262         kfree(rbd_dev);
2263 }
2264
2265 static bool rbd_snap_registered(struct rbd_snap *snap)
2266 {
2267         bool ret = snap->dev.type == &rbd_snap_device_type;
2268         bool reg = device_is_registered(&snap->dev);
2269
2270         rbd_assert(!ret ^ reg);
2271
2272         return ret;
2273 }
2274
2275 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2276 {
2277         list_del(&snap->node);
2278         if (device_is_registered(&snap->dev))
2279                 device_unregister(&snap->dev);
2280 }
2281
2282 static int rbd_register_snap_dev(struct rbd_snap *snap,
2283                                   struct device *parent)
2284 {
2285         struct device *dev = &snap->dev;
2286         int ret;
2287
2288         dev->type = &rbd_snap_device_type;
2289         dev->parent = parent;
2290         dev->release = rbd_snap_dev_release;
2291         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2292         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2293
2294         ret = device_register(dev);
2295
2296         return ret;
2297 }
2298
2299 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2300                                                 const char *snap_name,
2301                                                 u64 snap_id, u64 snap_size,
2302                                                 u64 snap_features)
2303 {
2304         struct rbd_snap *snap;
2305         int ret;
2306
2307         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2308         if (!snap)
2309                 return ERR_PTR(-ENOMEM);
2310
2311         ret = -ENOMEM;
2312         snap->name = kstrdup(snap_name, GFP_KERNEL);
2313         if (!snap->name)
2314                 goto err;
2315
2316         snap->id = snap_id;
2317         snap->size = snap_size;
2318         snap->features = snap_features;
2319
2320         return snap;
2321
2322 err:
2323         kfree(snap->name);
2324         kfree(snap);
2325
2326         return ERR_PTR(ret);
2327 }
2328
2329 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2330                 u64 *snap_size, u64 *snap_features)
2331 {
2332         char *snap_name;
2333
2334         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2335
2336         *snap_size = rbd_dev->header.snap_sizes[which];
2337         *snap_features = 0;     /* No features for v1 */
2338
2339         /* Skip over names until we find the one we are looking for */
2340
2341         snap_name = rbd_dev->header.snap_names;
2342         while (which--)
2343                 snap_name += strlen(snap_name) + 1;
2344
2345         return snap_name;
2346 }
2347
2348 /*
2349  * Get the size and object order for an image snapshot, or if
2350  * snap_id is CEPH_NOSNAP, gets this information for the base
2351  * image.
2352  */
2353 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2354                                 u8 *order, u64 *snap_size)
2355 {
2356         __le64 snapid = cpu_to_le64(snap_id);
2357         int ret;
2358         struct {
2359                 u8 order;
2360                 __le64 size;
2361         } __attribute__ ((packed)) size_buf = { 0 };
2362
2363         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2364                                 "rbd", "get_size",
2365                                 (char *) &snapid, sizeof (snapid),
2366                                 (char *) &size_buf, sizeof (size_buf),
2367                                 CEPH_OSD_FLAG_READ, NULL);
2368         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2369         if (ret < 0)
2370                 return ret;
2371
2372         *order = size_buf.order;
2373         *snap_size = le64_to_cpu(size_buf.size);
2374
2375         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2376                 (unsigned long long) snap_id, (unsigned int) *order,
2377                 (unsigned long long) *snap_size);
2378
2379         return 0;
2380 }
2381
2382 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2383 {
2384         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2385                                         &rbd_dev->header.obj_order,
2386                                         &rbd_dev->header.image_size);
2387 }
2388
2389 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2390 {
2391         void *reply_buf;
2392         int ret;
2393         void *p;
2394
2395         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2396         if (!reply_buf)
2397                 return -ENOMEM;
2398
2399         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2400                                 "rbd", "get_object_prefix",
2401                                 NULL, 0,
2402                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2403                                 CEPH_OSD_FLAG_READ, NULL);
2404         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2405         if (ret < 0)
2406                 goto out;
2407         ret = 0;    /* rbd_req_sync_exec() can return positive */
2408
2409         p = reply_buf;
2410         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2411                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2412                                                 NULL, GFP_NOIO);
2413
2414         if (IS_ERR(rbd_dev->header.object_prefix)) {
2415                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2416                 rbd_dev->header.object_prefix = NULL;
2417         } else {
2418                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2419         }
2420
2421 out:
2422         kfree(reply_buf);
2423
2424         return ret;
2425 }
2426
2427 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2428                 u64 *snap_features)
2429 {
2430         __le64 snapid = cpu_to_le64(snap_id);
2431         struct {
2432                 __le64 features;
2433                 __le64 incompat;
2434         } features_buf = { 0 };
2435         u64 incompat;
2436         int ret;
2437
2438         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2439                                 "rbd", "get_features",
2440                                 (char *) &snapid, sizeof (snapid),
2441                                 (char *) &features_buf, sizeof (features_buf),
2442                                 CEPH_OSD_FLAG_READ, NULL);
2443         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2444         if (ret < 0)
2445                 return ret;
2446
2447         incompat = le64_to_cpu(features_buf.incompat);
2448         if (incompat & ~RBD_FEATURES_ALL)
2449                 return -ENXIO;
2450
2451         *snap_features = le64_to_cpu(features_buf.features);
2452
2453         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2454                 (unsigned long long) snap_id,
2455                 (unsigned long long) *snap_features,
2456                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2457
2458         return 0;
2459 }
2460
2461 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2462 {
2463         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2464                                                 &rbd_dev->header.features);
2465 }
2466
2467 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2468 {
2469         struct rbd_spec *parent_spec;
2470         size_t size;
2471         void *reply_buf = NULL;
2472         __le64 snapid;
2473         void *p;
2474         void *end;
2475         char *image_id;
2476         u64 overlap;
2477         size_t len = 0;
2478         int ret;
2479
2480         parent_spec = rbd_spec_alloc();
2481         if (!parent_spec)
2482                 return -ENOMEM;
2483
2484         size = sizeof (__le64) +                                /* pool_id */
2485                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
2486                 sizeof (__le64) +                               /* snap_id */
2487                 sizeof (__le64);                                /* overlap */
2488         reply_buf = kmalloc(size, GFP_KERNEL);
2489         if (!reply_buf) {
2490                 ret = -ENOMEM;
2491                 goto out_err;
2492         }
2493
2494         snapid = cpu_to_le64(CEPH_NOSNAP);
2495         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2496                                 "rbd", "get_parent",
2497                                 (char *) &snapid, sizeof (snapid),
2498                                 (char *) reply_buf, size,
2499                                 CEPH_OSD_FLAG_READ, NULL);
2500         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2501         if (ret < 0)
2502                 goto out_err;
2503
2504         ret = -ERANGE;
2505         p = reply_buf;
2506         end = (char *) reply_buf + size;
2507         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2508         if (parent_spec->pool_id == CEPH_NOPOOL)
2509                 goto out;       /* No parent?  No problem. */
2510
2511         image_id = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2512         if (IS_ERR(image_id)) {
2513                 ret = PTR_ERR(image_id);
2514                 goto out_err;
2515         }
2516         parent_spec->image_id = image_id;
2517         parent_spec->image_id_len = len;
2518         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2519         ceph_decode_64_safe(&p, end, overlap, out_err);
2520
2521         rbd_dev->parent_overlap = overlap;
2522         rbd_dev->parent_spec = parent_spec;
2523         parent_spec = NULL;     /* rbd_dev now owns this */
2524 out:
2525         ret = 0;
2526 out_err:
2527         kfree(reply_buf);
2528         rbd_spec_put(parent_spec);
2529
2530         return ret;
2531 }
2532
2533 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2534 {
2535         size_t image_id_size;
2536         char *image_id;
2537         void *p;
2538         void *end;
2539         size_t size;
2540         void *reply_buf = NULL;
2541         size_t len = 0;
2542         char *image_name = NULL;
2543         int ret;
2544
2545         rbd_assert(!rbd_dev->spec->image_name);
2546
2547         image_id_size = sizeof (__le32) + rbd_dev->spec->image_id_len;
2548         image_id = kmalloc(image_id_size, GFP_KERNEL);
2549         if (!image_id)
2550                 return NULL;
2551
2552         p = image_id;
2553         end = (char *) image_id + image_id_size;
2554         ceph_encode_string(&p, end, rbd_dev->spec->image_id,
2555                                 (u32) rbd_dev->spec->image_id_len);
2556
2557         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2558         reply_buf = kmalloc(size, GFP_KERNEL);
2559         if (!reply_buf)
2560                 goto out;
2561
2562         ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
2563                                 "rbd", "dir_get_name",
2564                                 image_id, image_id_size,
2565                                 (char *) reply_buf, size,
2566                                 CEPH_OSD_FLAG_READ, NULL);
2567         if (ret < 0)
2568                 goto out;
2569         p = reply_buf;
2570         end = (char *) reply_buf + size;
2571         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2572         if (IS_ERR(image_name))
2573                 image_name = NULL;
2574         else
2575                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
2576 out:
2577         kfree(reply_buf);
2578         kfree(image_id);
2579
2580         return image_name;
2581 }
2582
2583 /*
2584  * When a parent image gets probed, we only have the pool, image,
2585  * and snapshot ids but not the names of any of them.  This call
2586  * is made later to fill in those names.  It has to be done after
2587  * rbd_dev_snaps_update() has completed because some of the
2588  * information (in particular, snapshot name) is not available
2589  * until then.
2590  */
2591 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
2592 {
2593         struct ceph_osd_client *osdc;
2594         const char *name;
2595         void *reply_buf = NULL;
2596         int ret;
2597
2598         if (rbd_dev->spec->pool_name)
2599                 return 0;       /* Already have the names */
2600
2601         /* Look up the pool name */
2602
2603         osdc = &rbd_dev->rbd_client->client->osdc;
2604         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
2605         if (!name)
2606                 return -EIO;    /* pool id too large (>= 2^31) */
2607
2608         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
2609         if (!rbd_dev->spec->pool_name)
2610                 return -ENOMEM;
2611
2612         /* Fetch the image name; tolerate failure here */
2613
2614         name = rbd_dev_image_name(rbd_dev);
2615         if (name) {
2616                 rbd_dev->spec->image_name_len = strlen(name);
2617                 rbd_dev->spec->image_name = (char *) name;
2618         } else {
2619                 pr_warning(RBD_DRV_NAME "%d "
2620                         "unable to get image name for image id %s\n",
2621                         rbd_dev->major, rbd_dev->spec->image_id);
2622         }
2623
2624         /* Look up the snapshot name. */
2625
2626         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
2627         if (!name) {
2628                 ret = -EIO;
2629                 goto out_err;
2630         }
2631         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
2632         if(!rbd_dev->spec->snap_name)
2633                 goto out_err;
2634
2635         return 0;
2636 out_err:
2637         kfree(reply_buf);
2638         kfree(rbd_dev->spec->pool_name);
2639         rbd_dev->spec->pool_name = NULL;
2640
2641         return ret;
2642 }
2643
2644 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2645 {
2646         size_t size;
2647         int ret;
2648         void *reply_buf;
2649         void *p;
2650         void *end;
2651         u64 seq;
2652         u32 snap_count;
2653         struct ceph_snap_context *snapc;
2654         u32 i;
2655
2656         /*
2657          * We'll need room for the seq value (maximum snapshot id),
2658          * snapshot count, and array of that many snapshot ids.
2659          * For now we have a fixed upper limit on the number we're
2660          * prepared to receive.
2661          */
2662         size = sizeof (__le64) + sizeof (__le32) +
2663                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
2664         reply_buf = kzalloc(size, GFP_KERNEL);
2665         if (!reply_buf)
2666                 return -ENOMEM;
2667
2668         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2669                                 "rbd", "get_snapcontext",
2670                                 NULL, 0,
2671                                 reply_buf, size,
2672                                 CEPH_OSD_FLAG_READ, ver);
2673         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2674         if (ret < 0)
2675                 goto out;
2676
2677         ret = -ERANGE;
2678         p = reply_buf;
2679         end = (char *) reply_buf + size;
2680         ceph_decode_64_safe(&p, end, seq, out);
2681         ceph_decode_32_safe(&p, end, snap_count, out);
2682
2683         /*
2684          * Make sure the reported number of snapshot ids wouldn't go
2685          * beyond the end of our buffer.  But before checking that,
2686          * make sure the computed size of the snapshot context we
2687          * allocate is representable in a size_t.
2688          */
2689         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2690                                  / sizeof (u64)) {
2691                 ret = -EINVAL;
2692                 goto out;
2693         }
2694         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2695                 goto out;
2696
2697         size = sizeof (struct ceph_snap_context) +
2698                                 snap_count * sizeof (snapc->snaps[0]);
2699         snapc = kmalloc(size, GFP_KERNEL);
2700         if (!snapc) {
2701                 ret = -ENOMEM;
2702                 goto out;
2703         }
2704
2705         atomic_set(&snapc->nref, 1);
2706         snapc->seq = seq;
2707         snapc->num_snaps = snap_count;
2708         for (i = 0; i < snap_count; i++)
2709                 snapc->snaps[i] = ceph_decode_64(&p);
2710
2711         rbd_dev->header.snapc = snapc;
2712
2713         dout("  snap context seq = %llu, snap_count = %u\n",
2714                 (unsigned long long) seq, (unsigned int) snap_count);
2715
2716 out:
2717         kfree(reply_buf);
2718
2719         return 0;
2720 }
2721
2722 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2723 {
2724         size_t size;
2725         void *reply_buf;
2726         __le64 snap_id;
2727         int ret;
2728         void *p;
2729         void *end;
2730         char *snap_name;
2731
2732         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2733         reply_buf = kmalloc(size, GFP_KERNEL);
2734         if (!reply_buf)
2735                 return ERR_PTR(-ENOMEM);
2736
2737         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2738         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2739                                 "rbd", "get_snapshot_name",
2740                                 (char *) &snap_id, sizeof (snap_id),
2741                                 reply_buf, size,
2742                                 CEPH_OSD_FLAG_READ, NULL);
2743         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2744         if (ret < 0)
2745                 goto out;
2746
2747         p = reply_buf;
2748         end = (char *) reply_buf + size;
2749         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2750         if (IS_ERR(snap_name)) {
2751                 ret = PTR_ERR(snap_name);
2752                 goto out;
2753         } else {
2754                 dout("  snap_id 0x%016llx snap_name = %s\n",
2755                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
2756         }
2757         kfree(reply_buf);
2758
2759         return snap_name;
2760 out:
2761         kfree(reply_buf);
2762
2763         return ERR_PTR(ret);
2764 }
2765
2766 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2767                 u64 *snap_size, u64 *snap_features)
2768 {
2769         __le64 snap_id;
2770         u8 order;
2771         int ret;
2772
2773         snap_id = rbd_dev->header.snapc->snaps[which];
2774         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2775         if (ret)
2776                 return ERR_PTR(ret);
2777         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2778         if (ret)
2779                 return ERR_PTR(ret);
2780
2781         return rbd_dev_v2_snap_name(rbd_dev, which);
2782 }
2783
2784 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2785                 u64 *snap_size, u64 *snap_features)
2786 {
2787         if (rbd_dev->image_format == 1)
2788                 return rbd_dev_v1_snap_info(rbd_dev, which,
2789                                         snap_size, snap_features);
2790         if (rbd_dev->image_format == 2)
2791                 return rbd_dev_v2_snap_info(rbd_dev, which,
2792                                         snap_size, snap_features);
2793         return ERR_PTR(-EINVAL);
2794 }
2795
2796 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2797 {
2798         int ret;
2799         __u8 obj_order;
2800
2801         down_write(&rbd_dev->header_rwsem);
2802
2803         /* Grab old order first, to see if it changes */
2804
2805         obj_order = rbd_dev->header.obj_order,
2806         ret = rbd_dev_v2_image_size(rbd_dev);
2807         if (ret)
2808                 goto out;
2809         if (rbd_dev->header.obj_order != obj_order) {
2810                 ret = -EIO;
2811                 goto out;
2812         }
2813         rbd_update_mapping_size(rbd_dev);
2814
2815         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2816         dout("rbd_dev_v2_snap_context returned %d\n", ret);
2817         if (ret)
2818                 goto out;
2819         ret = rbd_dev_snaps_update(rbd_dev);
2820         dout("rbd_dev_snaps_update returned %d\n", ret);
2821         if (ret)
2822                 goto out;
2823         ret = rbd_dev_snaps_register(rbd_dev);
2824         dout("rbd_dev_snaps_register returned %d\n", ret);
2825 out:
2826         up_write(&rbd_dev->header_rwsem);
2827
2828         return ret;
2829 }
2830
2831 /*
2832  * Scan the rbd device's current snapshot list and compare it to the
2833  * newly-received snapshot context.  Remove any existing snapshots
2834  * not present in the new snapshot context.  Add a new snapshot for
2835  * any snaphots in the snapshot context not in the current list.
2836  * And verify there are no changes to snapshots we already know
2837  * about.
2838  *
2839  * Assumes the snapshots in the snapshot context are sorted by
2840  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2841  * are also maintained in that order.)
2842  */
2843 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2844 {
2845         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2846         const u32 snap_count = snapc->num_snaps;
2847         struct list_head *head = &rbd_dev->snaps;
2848         struct list_head *links = head->next;
2849         u32 index = 0;
2850
2851         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2852         while (index < snap_count || links != head) {
2853                 u64 snap_id;
2854                 struct rbd_snap *snap;
2855                 char *snap_name;
2856                 u64 snap_size = 0;
2857                 u64 snap_features = 0;
2858
2859                 snap_id = index < snap_count ? snapc->snaps[index]
2860                                              : CEPH_NOSNAP;
2861                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2862                                      : NULL;
2863                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2864
2865                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2866                         struct list_head *next = links->next;
2867
2868                         /* Existing snapshot not in the new snap context */
2869
2870                         if (rbd_dev->spec->snap_id == snap->id)
2871                                 rbd_dev->exists = false;
2872                         rbd_remove_snap_dev(snap);
2873                         dout("%ssnap id %llu has been removed\n",
2874                                 rbd_dev->spec->snap_id == snap->id ?
2875                                                         "mapped " : "",
2876                                 (unsigned long long) snap->id);
2877
2878                         /* Done with this list entry; advance */
2879
2880                         links = next;
2881                         continue;
2882                 }
2883
2884                 snap_name = rbd_dev_snap_info(rbd_dev, index,
2885                                         &snap_size, &snap_features);
2886                 if (IS_ERR(snap_name))
2887                         return PTR_ERR(snap_name);
2888
2889                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2890                         (unsigned long long) snap_id);
2891                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2892                         struct rbd_snap *new_snap;
2893
2894                         /* We haven't seen this snapshot before */
2895
2896                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2897                                         snap_id, snap_size, snap_features);
2898                         if (IS_ERR(new_snap)) {
2899                                 int err = PTR_ERR(new_snap);
2900
2901                                 dout("  failed to add dev, error %d\n", err);
2902
2903                                 return err;
2904                         }
2905
2906                         /* New goes before existing, or at end of list */
2907
2908                         dout("  added dev%s\n", snap ? "" : " at end\n");
2909                         if (snap)
2910                                 list_add_tail(&new_snap->node, &snap->node);
2911                         else
2912                                 list_add_tail(&new_snap->node, head);
2913                 } else {
2914                         /* Already have this one */
2915
2916                         dout("  already present\n");
2917
2918                         rbd_assert(snap->size == snap_size);
2919                         rbd_assert(!strcmp(snap->name, snap_name));
2920                         rbd_assert(snap->features == snap_features);
2921
2922                         /* Done with this list entry; advance */
2923
2924                         links = links->next;
2925                 }
2926
2927                 /* Advance to the next entry in the snapshot context */
2928
2929                 index++;
2930         }
2931         dout("%s: done\n", __func__);
2932
2933         return 0;
2934 }
2935
2936 /*
2937  * Scan the list of snapshots and register the devices for any that
2938  * have not already been registered.
2939  */
2940 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2941 {
2942         struct rbd_snap *snap;
2943         int ret = 0;
2944
2945         dout("%s called\n", __func__);
2946         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2947                 return -EIO;
2948
2949         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2950                 if (!rbd_snap_registered(snap)) {
2951                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2952                         if (ret < 0)
2953                                 break;
2954                 }
2955         }
2956         dout("%s: returning %d\n", __func__, ret);
2957
2958         return ret;
2959 }
2960
2961 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2962 {
2963         struct device *dev;
2964         int ret;
2965
2966         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2967
2968         dev = &rbd_dev->dev;
2969         dev->bus = &rbd_bus_type;
2970         dev->type = &rbd_device_type;
2971         dev->parent = &rbd_root_dev;
2972         dev->release = rbd_dev_release;
2973         dev_set_name(dev, "%d", rbd_dev->dev_id);
2974         ret = device_register(dev);
2975
2976         mutex_unlock(&ctl_mutex);
2977
2978         return ret;
2979 }
2980
2981 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2982 {
2983         device_unregister(&rbd_dev->dev);
2984 }
2985
2986 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2987 {
2988         int ret, rc;
2989
2990         do {
2991                 ret = rbd_req_sync_watch(rbd_dev);
2992                 if (ret == -ERANGE) {
2993                         rc = rbd_dev_refresh(rbd_dev, NULL);
2994                         if (rc < 0)
2995                                 return rc;
2996                 }
2997         } while (ret == -ERANGE);
2998
2999         return ret;
3000 }
3001
3002 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3003
3004 /*
3005  * Get a unique rbd identifier for the given new rbd_dev, and add
3006  * the rbd_dev to the global list.  The minimum rbd id is 1.
3007  */
3008 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3009 {
3010         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3011
3012         spin_lock(&rbd_dev_list_lock);
3013         list_add_tail(&rbd_dev->node, &rbd_dev_list);
3014         spin_unlock(&rbd_dev_list_lock);
3015         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3016                 (unsigned long long) rbd_dev->dev_id);
3017 }
3018
3019 /*
3020  * Remove an rbd_dev from the global list, and record that its
3021  * identifier is no longer in use.
3022  */
3023 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3024 {
3025         struct list_head *tmp;
3026         int rbd_id = rbd_dev->dev_id;
3027         int max_id;
3028
3029         rbd_assert(rbd_id > 0);
3030
3031         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3032                 (unsigned long long) rbd_dev->dev_id);
3033         spin_lock(&rbd_dev_list_lock);
3034         list_del_init(&rbd_dev->node);
3035
3036         /*
3037          * If the id being "put" is not the current maximum, there
3038          * is nothing special we need to do.
3039          */
3040         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3041                 spin_unlock(&rbd_dev_list_lock);
3042                 return;
3043         }
3044
3045         /*
3046          * We need to update the current maximum id.  Search the
3047          * list to find out what it is.  We're more likely to find
3048          * the maximum at the end, so search the list backward.
3049          */
3050         max_id = 0;
3051         list_for_each_prev(tmp, &rbd_dev_list) {
3052                 struct rbd_device *rbd_dev;
3053
3054                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3055                 if (rbd_dev->dev_id > max_id)
3056                         max_id = rbd_dev->dev_id;
3057         }
3058         spin_unlock(&rbd_dev_list_lock);
3059
3060         /*
3061          * The max id could have been updated by rbd_dev_id_get(), in
3062          * which case it now accurately reflects the new maximum.
3063          * Be careful not to overwrite the maximum value in that
3064          * case.
3065          */
3066         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3067         dout("  max dev id has been reset\n");
3068 }
3069
3070 /*
3071  * Skips over white space at *buf, and updates *buf to point to the
3072  * first found non-space character (if any). Returns the length of
3073  * the token (string of non-white space characters) found.  Note
3074  * that *buf must be terminated with '\0'.
3075  */
3076 static inline size_t next_token(const char **buf)
3077 {
3078         /*
3079         * These are the characters that produce nonzero for
3080         * isspace() in the "C" and "POSIX" locales.
3081         */
3082         const char *spaces = " \f\n\r\t\v";
3083
3084         *buf += strspn(*buf, spaces);   /* Find start of token */
3085
3086         return strcspn(*buf, spaces);   /* Return token length */
3087 }
3088
3089 /*
3090  * Finds the next token in *buf, and if the provided token buffer is
3091  * big enough, copies the found token into it.  The result, if
3092  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
3093  * must be terminated with '\0' on entry.
3094  *
3095  * Returns the length of the token found (not including the '\0').
3096  * Return value will be 0 if no token is found, and it will be >=
3097  * token_size if the token would not fit.
3098  *
3099  * The *buf pointer will be updated to point beyond the end of the
3100  * found token.  Note that this occurs even if the token buffer is
3101  * too small to hold it.
3102  */
3103 static inline size_t copy_token(const char **buf,
3104                                 char *token,
3105                                 size_t token_size)
3106 {
3107         size_t len;
3108
3109         len = next_token(buf);
3110         if (len < token_size) {
3111                 memcpy(token, *buf, len);
3112                 *(token + len) = '\0';
3113         }
3114         *buf += len;
3115
3116         return len;
3117 }
3118
3119 /*
3120  * Finds the next token in *buf, dynamically allocates a buffer big
3121  * enough to hold a copy of it, and copies the token into the new
3122  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
3123  * that a duplicate buffer is created even for a zero-length token.
3124  *
3125  * Returns a pointer to the newly-allocated duplicate, or a null
3126  * pointer if memory for the duplicate was not available.  If
3127  * the lenp argument is a non-null pointer, the length of the token
3128  * (not including the '\0') is returned in *lenp.
3129  *
3130  * If successful, the *buf pointer will be updated to point beyond
3131  * the end of the found token.
3132  *
3133  * Note: uses GFP_KERNEL for allocation.
3134  */
3135 static inline char *dup_token(const char **buf, size_t *lenp)
3136 {
3137         char *dup;
3138         size_t len;
3139
3140         len = next_token(buf);
3141         dup = kmalloc(len + 1, GFP_KERNEL);
3142         if (!dup)
3143                 return NULL;
3144
3145         memcpy(dup, *buf, len);
3146         *(dup + len) = '\0';
3147         *buf += len;
3148
3149         if (lenp)
3150                 *lenp = len;
3151
3152         return dup;
3153 }
3154
3155 /*
3156  * Parse the options provided for an "rbd add" (i.e., rbd image
3157  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
3158  * and the data written is passed here via a NUL-terminated buffer.
3159  * Returns 0 if successful or an error code otherwise.
3160  *
3161  * The information extracted from these options is recorded in
3162  * the other parameters which return dynamically-allocated
3163  * structures:
3164  *  ceph_opts
3165  *      The address of a pointer that will refer to a ceph options
3166  *      structure.  Caller must release the returned pointer using
3167  *      ceph_destroy_options() when it is no longer needed.
3168  *  rbd_opts
3169  *      Address of an rbd options pointer.  Fully initialized by
3170  *      this function; caller must release with kfree().
3171  *  spec
3172  *      Address of an rbd image specification pointer.  Fully
3173  *      initialized by this function based on parsed options.
3174  *      Caller must release with rbd_spec_put().
3175  *
3176  * The options passed take this form:
3177  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3178  * where:
3179  *  <mon_addrs>
3180  *      A comma-separated list of one or more monitor addresses.
3181  *      A monitor address is an ip address, optionally followed
3182  *      by a port number (separated by a colon).
3183  *        I.e.:  ip1[:port1][,ip2[:port2]...]
3184  *  <options>
3185  *      A comma-separated list of ceph and/or rbd options.
3186  *  <pool_name>
3187  *      The name of the rados pool containing the rbd image.
3188  *  <image_name>
3189  *      The name of the image in that pool to map.
3190  *  <snap_id>
3191  *      An optional snapshot id.  If provided, the mapping will
3192  *      present data from the image at the time that snapshot was
3193  *      created.  The image head is used if no snapshot id is
3194  *      provided.  Snapshot mappings are always read-only.
3195  */
3196 static int rbd_add_parse_args(const char *buf,
3197                                 struct ceph_options **ceph_opts,
3198                                 struct rbd_options **opts,
3199                                 struct rbd_spec **rbd_spec)
3200 {
3201         size_t len;
3202         char *options;
3203         const char *mon_addrs;
3204         size_t mon_addrs_size;
3205         struct rbd_spec *spec = NULL;
3206         struct rbd_options *rbd_opts = NULL;
3207         struct ceph_options *copts;
3208         int ret;
3209
3210         /* The first four tokens are required */
3211
3212         len = next_token(&buf);
3213         if (!len)
3214                 return -EINVAL; /* Missing monitor address(es) */
3215         mon_addrs = buf;
3216         mon_addrs_size = len + 1;
3217         buf += len;
3218
3219         ret = -EINVAL;
3220         options = dup_token(&buf, NULL);
3221         if (!options)
3222                 return -ENOMEM;
3223         if (!*options)
3224                 goto out_err;   /* Missing options */
3225
3226         spec = rbd_spec_alloc();
3227         if (!spec)
3228                 goto out_mem;
3229
3230         spec->pool_name = dup_token(&buf, NULL);
3231         if (!spec->pool_name)
3232                 goto out_mem;
3233         if (!*spec->pool_name)
3234                 goto out_err;   /* Missing pool name */
3235
3236         spec->image_name = dup_token(&buf, &spec->image_name_len);
3237         if (!spec->image_name)
3238                 goto out_mem;
3239         if (!*spec->image_name)
3240                 goto out_err;   /* Missing image name */
3241
3242         /*
3243          * Snapshot name is optional; default is to use "-"
3244          * (indicating the head/no snapshot).
3245          */
3246         len = next_token(&buf);
3247         if (!len) {
3248                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3249                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3250         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3251                 ret = -ENAMETOOLONG;
3252                 goto out_err;
3253         }
3254         spec->snap_name = kmalloc(len + 1, GFP_KERNEL);
3255         if (!spec->snap_name)
3256                 goto out_mem;
3257         memcpy(spec->snap_name, buf, len);
3258         *(spec->snap_name + len) = '\0';
3259
3260         /* Initialize all rbd options to the defaults */
3261
3262         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3263         if (!rbd_opts)
3264                 goto out_mem;
3265
3266         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3267
3268         copts = ceph_parse_options(options, mon_addrs,
3269                                         mon_addrs + mon_addrs_size - 1,
3270                                         parse_rbd_opts_token, rbd_opts);
3271         if (IS_ERR(copts)) {
3272                 ret = PTR_ERR(copts);
3273                 goto out_err;
3274         }
3275         kfree(options);
3276
3277         *ceph_opts = copts;
3278         *opts = rbd_opts;
3279         *rbd_spec = spec;
3280
3281         return 0;
3282 out_mem:
3283         ret = -ENOMEM;
3284 out_err:
3285         kfree(rbd_opts);
3286         rbd_spec_put(spec);
3287         kfree(options);
3288
3289         return ret;
3290 }
3291
3292 /*
3293  * An rbd format 2 image has a unique identifier, distinct from the
3294  * name given to it by the user.  Internally, that identifier is
3295  * what's used to specify the names of objects related to the image.
3296  *
3297  * A special "rbd id" object is used to map an rbd image name to its
3298  * id.  If that object doesn't exist, then there is no v2 rbd image
3299  * with the supplied name.
3300  *
3301  * This function will record the given rbd_dev's image_id field if
3302  * it can be determined, and in that case will return 0.  If any
3303  * errors occur a negative errno will be returned and the rbd_dev's
3304  * image_id field will be unchanged (and should be NULL).
3305  */
3306 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3307 {
3308         int ret;
3309         size_t size;
3310         char *object_name;
3311         void *response;
3312         void *p;
3313
3314         /*
3315          * When probing a parent image, the image id is already
3316          * known (and the image name likely is not).  There's no
3317          * need to fetch the image id again in this case.
3318          */
3319         if (rbd_dev->spec->image_id)
3320                 return 0;
3321
3322         /*
3323          * First, see if the format 2 image id file exists, and if
3324          * so, get the image's persistent id from it.
3325          */
3326         size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len;
3327         object_name = kmalloc(size, GFP_NOIO);
3328         if (!object_name)
3329                 return -ENOMEM;
3330         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3331         dout("rbd id object name is %s\n", object_name);
3332
3333         /* Response will be an encoded string, which includes a length */
3334
3335         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3336         response = kzalloc(size, GFP_NOIO);
3337         if (!response) {
3338                 ret = -ENOMEM;
3339                 goto out;
3340         }
3341
3342         ret = rbd_req_sync_exec(rbd_dev, object_name,
3343                                 "rbd", "get_id",
3344                                 NULL, 0,
3345                                 response, RBD_IMAGE_ID_LEN_MAX,
3346                                 CEPH_OSD_FLAG_READ, NULL);
3347         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3348         if (ret < 0)
3349                 goto out;
3350         ret = 0;    /* rbd_req_sync_exec() can return positive */
3351
3352         p = response;
3353         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3354                                                 p + RBD_IMAGE_ID_LEN_MAX,
3355                                                 &rbd_dev->spec->image_id_len,
3356                                                 GFP_NOIO);
3357         if (IS_ERR(rbd_dev->spec->image_id)) {
3358                 ret = PTR_ERR(rbd_dev->spec->image_id);
3359                 rbd_dev->spec->image_id = NULL;
3360         } else {
3361                 dout("image_id is %s\n", rbd_dev->spec->image_id);
3362         }
3363 out:
3364         kfree(response);
3365         kfree(object_name);
3366
3367         return ret;
3368 }
3369
3370 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3371 {
3372         int ret;
3373         size_t size;
3374
3375         /* Version 1 images have no id; empty string is used */
3376
3377         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3378         if (!rbd_dev->spec->image_id)
3379                 return -ENOMEM;
3380         rbd_dev->spec->image_id_len = 0;
3381
3382         /* Record the header object name for this rbd image. */
3383
3384         size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX);
3385         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3386         if (!rbd_dev->header_name) {
3387                 ret = -ENOMEM;
3388                 goto out_err;
3389         }
3390         sprintf(rbd_dev->header_name, "%s%s",
3391                 rbd_dev->spec->image_name, RBD_SUFFIX);
3392
3393         /* Populate rbd image metadata */
3394
3395         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3396         if (ret < 0)
3397                 goto out_err;
3398
3399         /* Version 1 images have no parent (no layering) */
3400
3401         rbd_dev->parent_spec = NULL;
3402         rbd_dev->parent_overlap = 0;
3403
3404         rbd_dev->image_format = 1;
3405
3406         dout("discovered version 1 image, header name is %s\n",
3407                 rbd_dev->header_name);
3408
3409         return 0;
3410
3411 out_err:
3412         kfree(rbd_dev->header_name);
3413         rbd_dev->header_name = NULL;
3414         kfree(rbd_dev->spec->image_id);
3415         rbd_dev->spec->image_id = NULL;
3416
3417         return ret;
3418 }
3419
3420 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3421 {
3422         size_t size;
3423         int ret;
3424         u64 ver = 0;
3425
3426         /*
3427          * Image id was filled in by the caller.  Record the header
3428          * object name for this rbd image.
3429          */
3430         size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len;
3431         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3432         if (!rbd_dev->header_name)
3433                 return -ENOMEM;
3434         sprintf(rbd_dev->header_name, "%s%s",
3435                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3436
3437         /* Get the size and object order for the image */
3438
3439         ret = rbd_dev_v2_image_size(rbd_dev);
3440         if (ret < 0)
3441                 goto out_err;
3442
3443         /* Get the object prefix (a.k.a. block_name) for the image */
3444
3445         ret = rbd_dev_v2_object_prefix(rbd_dev);
3446         if (ret < 0)
3447                 goto out_err;
3448
3449         /* Get the and check features for the image */
3450
3451         ret = rbd_dev_v2_features(rbd_dev);
3452         if (ret < 0)
3453                 goto out_err;
3454
3455         /* If the image supports layering, get the parent info */
3456
3457         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3458                 ret = rbd_dev_v2_parent_info(rbd_dev);
3459                 if (ret < 0)
3460                         goto out_err;
3461         }
3462
3463         /* crypto and compression type aren't (yet) supported for v2 images */
3464
3465         rbd_dev->header.crypt_type = 0;
3466         rbd_dev->header.comp_type = 0;
3467
3468         /* Get the snapshot context, plus the header version */
3469
3470         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3471         if (ret)
3472                 goto out_err;
3473         rbd_dev->header.obj_version = ver;
3474
3475         rbd_dev->image_format = 2;
3476
3477         dout("discovered version 2 image, header name is %s\n",
3478                 rbd_dev->header_name);
3479
3480         return 0;
3481 out_err:
3482         rbd_dev->parent_overlap = 0;
3483         rbd_spec_put(rbd_dev->parent_spec);
3484         rbd_dev->parent_spec = NULL;
3485         kfree(rbd_dev->header_name);
3486         rbd_dev->header_name = NULL;
3487         kfree(rbd_dev->header.object_prefix);
3488         rbd_dev->header.object_prefix = NULL;
3489