]> git.openfabrics.org - ~shefty/rdma-dev.git/blob - drivers/block/rbd.c
6ec9d53806c5e791b5d56fa0ae27dae0e903db02
[~shefty/rdma-dev.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    Instructions for use
25    --------------------
26
27    1) Map a Linux block device to an existing rbd image.
28
29       Usage: <mon ip addr> <options> <pool name> <rbd image name> [snap name]
30
31       $ echo "192.168.0.1 name=admin rbd foo" > /sys/class/rbd/add
32
33       The snapshot name can be "-" or omitted to map the image read/write.
34
35    2) List all active blkdev<->object mappings.
36
37       In this example, we have performed step #1 twice, creating two blkdevs,
38       mapped to two separate rados objects in the rados rbd pool
39
40       $ cat /sys/class/rbd/list
41       #id     major   client_name     pool    name    snap    KB
42       0       254     client4143      rbd     foo     -      1024000
43
44       The columns, in order, are:
45       - blkdev unique id
46       - blkdev assigned major
47       - rados client id
48       - rados pool name
49       - rados block device name
50       - mapped snapshot ("-" if none)
51       - device size in KB
52
53
54    3) Create a snapshot.
55
56       Usage: <blkdev id> <snapname>
57
58       $ echo "0 mysnap" > /sys/class/rbd/snap_create
59
60
61    4) Listing a snapshot.
62
63       $ cat /sys/class/rbd/snaps_list
64       #id     snap    KB
65       0       -       1024000 (*)
66       0       foo     1024000
67
68       The columns, in order, are:
69       - blkdev unique id
70       - snapshot name, '-' means none (active read/write version)
71       - size of device at time of snapshot
72       - the (*) indicates this is the active version
73
74    5) Rollback to snapshot.
75
76       Usage: <blkdev id> <snapname>
77
78       $ echo "0 mysnap" > /sys/class/rbd/snap_rollback
79
80
81    6) Mapping an image using snapshot.
82
83       A snapshot mapping is read-only. This is being done by passing
84       snap=<snapname> to the options when adding a device.
85
86       $ echo "192.168.0.1 name=admin,snap=mysnap rbd foo" > /sys/class/rbd/add
87
88
89    7) Remove an active blkdev<->rbd image mapping.
90
91       In this example, we remove the mapping with blkdev unique id 1.
92
93       $ echo 1 > /sys/class/rbd/remove
94
95
96    NOTE:  The actual creation and deletion of rados objects is outside the scope
97    of this driver.
98
99  */
100
101 #include <linux/ceph/libceph.h>
102 #include <linux/ceph/osd_client.h>
103 #include <linux/ceph/mon_client.h>
104 #include <linux/ceph/decode.h>
105
106 #include <linux/kernel.h>
107 #include <linux/device.h>
108 #include <linux/module.h>
109 #include <linux/fs.h>
110 #include <linux/blkdev.h>
111
112 #include "rbd_types.h"
113
114 #define DRV_NAME "rbd"
115 #define DRV_NAME_LONG "rbd (rados block device)"
116
117 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
118
119 #define RBD_MAX_MD_NAME_LEN     (96 + sizeof(RBD_SUFFIX))
120 #define RBD_MAX_POOL_NAME_LEN   64
121 #define RBD_MAX_SNAP_NAME_LEN   32
122 #define RBD_MAX_OPT_LEN         1024
123
124 #define RBD_SNAP_HEAD_NAME      "-"
125
126 #define DEV_NAME_LEN            32
127
128 /*
129  * block device image metadata (in-memory version)
130  */
131 struct rbd_image_header {
132         u64 image_size;
133         char block_name[32];
134         __u8 obj_order;
135         __u8 crypt_type;
136         __u8 comp_type;
137         struct rw_semaphore snap_rwsem;
138         struct ceph_snap_context *snapc;
139         size_t snap_names_len;
140         u64 snap_seq;
141         u32 total_snaps;
142
143         char *snap_names;
144         u64 *snap_sizes;
145 };
146
147 /*
148  * an instance of the client.  multiple devices may share a client.
149  */
150 struct rbd_client {
151         struct ceph_client      *client;
152         struct kref             kref;
153         struct list_head        node;
154 };
155
156 /*
157  * a single io request
158  */
159 struct rbd_request {
160         struct request          *rq;            /* blk layer request */
161         struct bio              *bio;           /* cloned bio */
162         struct page             **pages;        /* list of used pages */
163         u64                     len;
164 };
165
166 /*
167  * a single device
168  */
169 struct rbd_device {
170         int                     id;             /* blkdev unique id */
171
172         int                     major;          /* blkdev assigned major */
173         struct gendisk          *disk;          /* blkdev's gendisk and rq */
174         struct request_queue    *q;
175
176         struct ceph_client      *client;
177         struct rbd_client       *rbd_client;
178
179         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
180
181         spinlock_t              lock;           /* queue lock */
182
183         struct rbd_image_header header;
184         char                    obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
185         int                     obj_len;
186         char                    obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
187         char                    pool_name[RBD_MAX_POOL_NAME_LEN];
188         int                     poolid;
189
190         char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
191         u32 cur_snap;   /* index+1 of current snapshot within snap context
192                            0 - for the head */
193         int read_only;
194
195         struct list_head        node;
196 };
197
198 static spinlock_t node_lock;      /* protects client get/put */
199
200 static struct class *class_rbd;   /* /sys/class/rbd */
201 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
202 static LIST_HEAD(rbd_dev_list);    /* devices */
203 static LIST_HEAD(rbd_client_list);      /* clients */
204
205
206 static int rbd_open(struct block_device *bdev, fmode_t mode)
207 {
208         struct gendisk *disk = bdev->bd_disk;
209         struct rbd_device *rbd_dev = disk->private_data;
210
211         set_device_ro(bdev, rbd_dev->read_only);
212
213         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
214                 return -EROFS;
215
216         return 0;
217 }
218
219 static const struct block_device_operations rbd_bd_ops = {
220         .owner                  = THIS_MODULE,
221         .open                   = rbd_open,
222 };
223
224 /*
225  * Initialize an rbd client instance.
226  * We own *opt.
227  */
228 static struct rbd_client *rbd_client_create(struct ceph_options *opt)
229 {
230         struct rbd_client *rbdc;
231         int ret = -ENOMEM;
232
233         dout("rbd_client_create\n");
234         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
235         if (!rbdc)
236                 goto out_opt;
237
238         kref_init(&rbdc->kref);
239         INIT_LIST_HEAD(&rbdc->node);
240
241         rbdc->client = ceph_create_client(opt, rbdc);
242         if (IS_ERR(rbdc->client))
243                 goto out_rbdc;
244         opt = NULL; /* Now rbdc->client is responsible for opt */
245
246         ret = ceph_open_session(rbdc->client);
247         if (ret < 0)
248                 goto out_err;
249
250         spin_lock(&node_lock);
251         list_add_tail(&rbdc->node, &rbd_client_list);
252         spin_unlock(&node_lock);
253
254         dout("rbd_client_create created %p\n", rbdc);
255         return rbdc;
256
257 out_err:
258         ceph_destroy_client(rbdc->client);
259 out_rbdc:
260         kfree(rbdc);
261 out_opt:
262         if (opt)
263                 ceph_destroy_options(opt);
264         return ERR_PTR(ret);
265 }
266
267 /*
268  * Find a ceph client with specific addr and configuration.
269  */
270 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
271 {
272         struct rbd_client *client_node;
273
274         if (opt->flags & CEPH_OPT_NOSHARE)
275                 return NULL;
276
277         list_for_each_entry(client_node, &rbd_client_list, node)
278                 if (ceph_compare_options(opt, client_node->client) == 0)
279                         return client_node;
280         return NULL;
281 }
282
283 /*
284  * Get a ceph client with specific addr and configuration, if one does
285  * not exist create it.
286  */
287 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
288                           char *options)
289 {
290         struct rbd_client *rbdc;
291         struct ceph_options *opt;
292         int ret;
293
294         ret = ceph_parse_options(&opt, options, mon_addr,
295                                  mon_addr + strlen(mon_addr), NULL, NULL);
296         if (ret < 0)
297                 return ret;
298
299         spin_lock(&node_lock);
300         rbdc = __rbd_client_find(opt);
301         if (rbdc) {
302                 ceph_destroy_options(opt);
303
304                 /* using an existing client */
305                 kref_get(&rbdc->kref);
306                 rbd_dev->rbd_client = rbdc;
307                 rbd_dev->client = rbdc->client;
308                 spin_unlock(&node_lock);
309                 return 0;
310         }
311         spin_unlock(&node_lock);
312
313         rbdc = rbd_client_create(opt);
314         if (IS_ERR(rbdc))
315                 return PTR_ERR(rbdc);
316
317         rbd_dev->rbd_client = rbdc;
318         rbd_dev->client = rbdc->client;
319         return 0;
320 }
321
322 /*
323  * Destroy ceph client
324  */
325 static void rbd_client_release(struct kref *kref)
326 {
327         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
328
329         dout("rbd_release_client %p\n", rbdc);
330         spin_lock(&node_lock);
331         list_del(&rbdc->node);
332         spin_unlock(&node_lock);
333
334         ceph_destroy_client(rbdc->client);
335         kfree(rbdc);
336 }
337
338 /*
339  * Drop reference to ceph client node. If it's not referenced anymore, release
340  * it.
341  */
342 static void rbd_put_client(struct rbd_device *rbd_dev)
343 {
344         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
345         rbd_dev->rbd_client = NULL;
346         rbd_dev->client = NULL;
347 }
348
349
350 /*
351  * Create a new header structure, translate header format from the on-disk
352  * header.
353  */
354 static int rbd_header_from_disk(struct rbd_image_header *header,
355                                  struct rbd_image_header_ondisk *ondisk,
356                                  int allocated_snaps,
357                                  gfp_t gfp_flags)
358 {
359         int i;
360         u32 snap_count = le32_to_cpu(ondisk->snap_count);
361         int ret = -ENOMEM;
362
363         init_rwsem(&header->snap_rwsem);
364
365         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
366         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
367                                 snap_count *
368                                  sizeof(struct rbd_image_snap_ondisk),
369                                 gfp_flags);
370         if (!header->snapc)
371                 return -ENOMEM;
372         if (snap_count) {
373                 header->snap_names = kmalloc(header->snap_names_len,
374                                              GFP_KERNEL);
375                 if (!header->snap_names)
376                         goto err_snapc;
377                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
378                                              GFP_KERNEL);
379                 if (!header->snap_sizes)
380                         goto err_names;
381         } else {
382                 header->snap_names = NULL;
383                 header->snap_sizes = NULL;
384         }
385         memcpy(header->block_name, ondisk->block_name,
386                sizeof(ondisk->block_name));
387
388         header->image_size = le64_to_cpu(ondisk->image_size);
389         header->obj_order = ondisk->options.order;
390         header->crypt_type = ondisk->options.crypt_type;
391         header->comp_type = ondisk->options.comp_type;
392
393         atomic_set(&header->snapc->nref, 1);
394         header->snap_seq = le64_to_cpu(ondisk->snap_seq);
395         header->snapc->num_snaps = snap_count;
396         header->total_snaps = snap_count;
397
398         if (snap_count &&
399             allocated_snaps == snap_count) {
400                 for (i = 0; i < snap_count; i++) {
401                         header->snapc->snaps[i] =
402                                 le64_to_cpu(ondisk->snaps[i].id);
403                         header->snap_sizes[i] =
404                                 le64_to_cpu(ondisk->snaps[i].image_size);
405                 }
406
407                 /* copy snapshot names */
408                 memcpy(header->snap_names, &ondisk->snaps[i],
409                         header->snap_names_len);
410         }
411
412         return 0;
413
414 err_names:
415         kfree(header->snap_names);
416 err_snapc:
417         kfree(header->snapc);
418         return ret;
419 }
420
421 static int snap_index(struct rbd_image_header *header, int snap_num)
422 {
423         return header->total_snaps - snap_num;
424 }
425
426 static u64 cur_snap_id(struct rbd_device *rbd_dev)
427 {
428         struct rbd_image_header *header = &rbd_dev->header;
429
430         if (!rbd_dev->cur_snap)
431                 return 0;
432
433         return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
434 }
435
436 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
437                         u64 *seq, u64 *size)
438 {
439         int i;
440         char *p = header->snap_names;
441
442         for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
443                 if (strcmp(snap_name, p) == 0)
444                         break;
445         }
446         if (i == header->total_snaps)
447                 return -ENOENT;
448         if (seq)
449                 *seq = header->snapc->snaps[i];
450
451         if (size)
452                 *size = header->snap_sizes[i];
453
454         return i;
455 }
456
457 static int rbd_header_set_snap(struct rbd_device *dev,
458                                const char *snap_name,
459                                u64 *size)
460 {
461         struct rbd_image_header *header = &dev->header;
462         struct ceph_snap_context *snapc = header->snapc;
463         int ret = -ENOENT;
464
465         down_write(&header->snap_rwsem);
466
467         if (!snap_name ||
468             !*snap_name ||
469             strcmp(snap_name, "-") == 0 ||
470             strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
471                 if (header->total_snaps)
472                         snapc->seq = header->snap_seq;
473                 else
474                         snapc->seq = 0;
475                 dev->cur_snap = 0;
476                 dev->read_only = 0;
477                 if (size)
478                         *size = header->image_size;
479         } else {
480                 ret = snap_by_name(header, snap_name, &snapc->seq, size);
481                 if (ret < 0)
482                         goto done;
483
484                 dev->cur_snap = header->total_snaps - ret;
485                 dev->read_only = 1;
486         }
487
488         ret = 0;
489 done:
490         up_write(&header->snap_rwsem);
491         return ret;
492 }
493
494 static void rbd_header_free(struct rbd_image_header *header)
495 {
496         kfree(header->snapc);
497         kfree(header->snap_names);
498         kfree(header->snap_sizes);
499 }
500
501 /*
502  * get the actual striped segment name, offset and length
503  */
504 static u64 rbd_get_segment(struct rbd_image_header *header,
505                            const char *block_name,
506                            u64 ofs, u64 len,
507                            char *seg_name, u64 *segofs)
508 {
509         u64 seg = ofs >> header->obj_order;
510
511         if (seg_name)
512                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
513                          "%s.%012llx", block_name, seg);
514
515         ofs = ofs & ((1 << header->obj_order) - 1);
516         len = min_t(u64, len, (1 << header->obj_order) - ofs);
517
518         if (segofs)
519                 *segofs = ofs;
520
521         return len;
522 }
523
524 /*
525  * bio helpers
526  */
527
528 static void bio_chain_put(struct bio *chain)
529 {
530         struct bio *tmp;
531
532         while (chain) {
533                 tmp = chain;
534                 chain = chain->bi_next;
535                 bio_put(tmp);
536         }
537 }
538
539 /*
540  * zeros a bio chain, starting at specific offset
541  */
542 static void zero_bio_chain(struct bio *chain, int start_ofs)
543 {
544         struct bio_vec *bv;
545         unsigned long flags;
546         void *buf;
547         int i;
548         int pos = 0;
549
550         while (chain) {
551                 bio_for_each_segment(bv, chain, i) {
552                         if (pos + bv->bv_len > start_ofs) {
553                                 int remainder = max(start_ofs - pos, 0);
554                                 buf = bvec_kmap_irq(bv, &flags);
555                                 memset(buf + remainder, 0,
556                                        bv->bv_len - remainder);
557                                 bvec_kunmap_irq(buf, &flags);
558                         }
559                         pos += bv->bv_len;
560                 }
561
562                 chain = chain->bi_next;
563         }
564 }
565
566 /*
567  * bio_chain_clone - clone a chain of bios up to a certain length.
568  * might return a bio_pair that will need to be released.
569  */
570 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
571                                    struct bio_pair **bp,
572                                    int len, gfp_t gfpmask)
573 {
574         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
575         int total = 0;
576
577         if (*bp) {
578                 bio_pair_release(*bp);
579                 *bp = NULL;
580         }
581
582         while (old_chain && (total < len)) {
583                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
584                 if (!tmp)
585                         goto err_out;
586
587                 if (total + old_chain->bi_size > len) {
588                         struct bio_pair *bp;
589
590                         /*
591                          * this split can only happen with a single paged bio,
592                          * split_bio will BUG_ON if this is not the case
593                          */
594                         dout("bio_chain_clone split! total=%d remaining=%d"
595                              "bi_size=%d\n",
596                              (int)total, (int)len-total,
597                              (int)old_chain->bi_size);
598
599                         /* split the bio. We'll release it either in the next
600                            call, or it will have to be released outside */
601                         bp = bio_split(old_chain, (len - total) / 512ULL);
602                         if (!bp)
603                                 goto err_out;
604
605                         __bio_clone(tmp, &bp->bio1);
606
607                         *next = &bp->bio2;
608                 } else {
609                         __bio_clone(tmp, old_chain);
610                         *next = old_chain->bi_next;
611                 }
612
613                 tmp->bi_bdev = NULL;
614                 gfpmask &= ~__GFP_WAIT;
615                 tmp->bi_next = NULL;
616
617                 if (!new_chain) {
618                         new_chain = tail = tmp;
619                 } else {
620                         tail->bi_next = tmp;
621                         tail = tmp;
622                 }
623                 old_chain = old_chain->bi_next;
624
625                 total += tmp->bi_size;
626         }
627
628         BUG_ON(total < len);
629
630         if (tail)
631                 tail->bi_next = NULL;
632
633         *old = old_chain;
634
635         return new_chain;
636
637 err_out:
638         dout("bio_chain_clone with err\n");
639         bio_chain_put(new_chain);
640         return NULL;
641 }
642
643 /*
644  * helpers for osd request op vectors.
645  */
646 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
647                             int num_ops,
648                             int opcode,
649                             u32 payload_len)
650 {
651         *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
652                        GFP_NOIO);
653         if (!*ops)
654                 return -ENOMEM;
655         (*ops)[0].op = opcode;
656         /*
657          * op extent offset and length will be set later on
658          * in calc_raw_layout()
659          */
660         (*ops)[0].payload_len = payload_len;
661         return 0;
662 }
663
664 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
665 {
666         kfree(ops);
667 }
668
669 /*
670  * Send ceph osd request
671  */
672 static int rbd_do_request(struct request *rq,
673                           struct rbd_device *dev,
674                           struct ceph_snap_context *snapc,
675                           u64 snapid,
676                           const char *obj, u64 ofs, u64 len,
677                           struct bio *bio,
678                           struct page **pages,
679                           int num_pages,
680                           int flags,
681                           struct ceph_osd_req_op *ops,
682                           int num_reply,
683                           void (*rbd_cb)(struct ceph_osd_request *req,
684                                          struct ceph_msg *msg))
685 {
686         struct ceph_osd_request *req;
687         struct ceph_file_layout *layout;
688         int ret;
689         u64 bno;
690         struct timespec mtime = CURRENT_TIME;
691         struct rbd_request *req_data;
692         struct ceph_osd_request_head *reqhead;
693         struct rbd_image_header *header = &dev->header;
694
695         ret = -ENOMEM;
696         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
697         if (!req_data)
698                 goto done;
699
700         dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs);
701
702         down_read(&header->snap_rwsem);
703
704         req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
705                                       snapc,
706                                       ops,
707                                       false,
708                                       GFP_NOIO, pages, bio);
709         if (IS_ERR(req)) {
710                 up_read(&header->snap_rwsem);
711                 ret = PTR_ERR(req);
712                 goto done_pages;
713         }
714
715         req->r_callback = rbd_cb;
716
717         req_data->rq = rq;
718         req_data->bio = bio;
719         req_data->pages = pages;
720         req_data->len = len;
721
722         req->r_priv = req_data;
723
724         reqhead = req->r_request->front.iov_base;
725         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
726
727         strncpy(req->r_oid, obj, sizeof(req->r_oid));
728         req->r_oid_len = strlen(req->r_oid);
729
730         layout = &req->r_file_layout;
731         memset(layout, 0, sizeof(*layout));
732         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
733         layout->fl_stripe_count = cpu_to_le32(1);
734         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
735         layout->fl_pg_preferred = cpu_to_le32(-1);
736         layout->fl_pg_pool = cpu_to_le32(dev->poolid);
737         ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
738                              ofs, &len, &bno, req, ops);
739
740         ceph_osdc_build_request(req, ofs, &len,
741                                 ops,
742                                 snapc,
743                                 &mtime,
744                                 req->r_oid, req->r_oid_len);
745         up_read(&header->snap_rwsem);
746
747         ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
748         if (ret < 0)
749                 goto done_err;
750
751         if (!rbd_cb) {
752                 ret = ceph_osdc_wait_request(&dev->client->osdc, req);
753                 ceph_osdc_put_request(req);
754         }
755         return ret;
756
757 done_err:
758         bio_chain_put(req_data->bio);
759         ceph_osdc_put_request(req);
760 done_pages:
761         kfree(req_data);
762 done:
763         if (rq)
764                 blk_end_request(rq, ret, len);
765         return ret;
766 }
767
768 /*
769  * Ceph osd op callback
770  */
771 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
772 {
773         struct rbd_request *req_data = req->r_priv;
774         struct ceph_osd_reply_head *replyhead;
775         struct ceph_osd_op *op;
776         __s32 rc;
777         u64 bytes;
778         int read_op;
779
780         /* parse reply */
781         replyhead = msg->front.iov_base;
782         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
783         op = (void *)(replyhead + 1);
784         rc = le32_to_cpu(replyhead->result);
785         bytes = le64_to_cpu(op->extent.length);
786         read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
787
788         dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
789
790         if (rc == -ENOENT && read_op) {
791                 zero_bio_chain(req_data->bio, 0);
792                 rc = 0;
793         } else if (rc == 0 && read_op && bytes < req_data->len) {
794                 zero_bio_chain(req_data->bio, bytes);
795                 bytes = req_data->len;
796         }
797
798         blk_end_request(req_data->rq, rc, bytes);
799
800         if (req_data->bio)
801                 bio_chain_put(req_data->bio);
802
803         ceph_osdc_put_request(req);
804         kfree(req_data);
805 }
806
807 /*
808  * Do a synchronous ceph osd operation
809  */
810 static int rbd_req_sync_op(struct rbd_device *dev,
811                            struct ceph_snap_context *snapc,
812                            u64 snapid,
813                            int opcode,
814                            int flags,
815                            struct ceph_osd_req_op *orig_ops,
816                            int num_reply,
817                            const char *obj,
818                            u64 ofs, u64 len,
819                            char *buf)
820 {
821         int ret;
822         struct page **pages;
823         int num_pages;
824         struct ceph_osd_req_op *ops = orig_ops;
825         u32 payload_len;
826
827         num_pages = calc_pages_for(ofs , len);
828         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
829         if (IS_ERR(pages))
830                 return PTR_ERR(pages);
831
832         if (!orig_ops) {
833                 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
834                 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
835                 if (ret < 0)
836                         goto done;
837
838                 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
839                         ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
840                         if (ret < 0)
841                                 goto done_ops;
842                 }
843         }
844
845         ret = rbd_do_request(NULL, dev, snapc, snapid,
846                           obj, ofs, len, NULL,
847                           pages, num_pages,
848                           flags,
849                           ops,
850                           2,
851                           NULL);
852         if (ret < 0)
853                 goto done_ops;
854
855         if ((flags & CEPH_OSD_FLAG_READ) && buf)
856                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
857
858 done_ops:
859         if (!orig_ops)
860                 rbd_destroy_ops(ops);
861 done:
862         ceph_release_page_vector(pages, num_pages);
863         return ret;
864 }
865
866 /*
867  * Do an asynchronous ceph osd operation
868  */
869 static int rbd_do_op(struct request *rq,
870                      struct rbd_device *rbd_dev ,
871                      struct ceph_snap_context *snapc,
872                      u64 snapid,
873                      int opcode, int flags, int num_reply,
874                      u64 ofs, u64 len,
875                      struct bio *bio)
876 {
877         char *seg_name;
878         u64 seg_ofs;
879         u64 seg_len;
880         int ret;
881         struct ceph_osd_req_op *ops;
882         u32 payload_len;
883
884         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
885         if (!seg_name)
886                 return -ENOMEM;
887
888         seg_len = rbd_get_segment(&rbd_dev->header,
889                                   rbd_dev->header.block_name,
890                                   ofs, len,
891                                   seg_name, &seg_ofs);
892
893         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
894
895         ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
896         if (ret < 0)
897                 goto done;
898
899         /* we've taken care of segment sizes earlier when we
900            cloned the bios. We should never have a segment
901            truncated at this point */
902         BUG_ON(seg_len < len);
903
904         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
905                              seg_name, seg_ofs, seg_len,
906                              bio,
907                              NULL, 0,
908                              flags,
909                              ops,
910                              num_reply,
911                              rbd_req_cb);
912 done:
913         kfree(seg_name);
914         return ret;
915 }
916
917 /*
918  * Request async osd write
919  */
920 static int rbd_req_write(struct request *rq,
921                          struct rbd_device *rbd_dev,
922                          struct ceph_snap_context *snapc,
923                          u64 ofs, u64 len,
924                          struct bio *bio)
925 {
926         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
927                          CEPH_OSD_OP_WRITE,
928                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
929                          2,
930                          ofs, len, bio);
931 }
932
933 /*
934  * Request async osd read
935  */
936 static int rbd_req_read(struct request *rq,
937                          struct rbd_device *rbd_dev,
938                          u64 snapid,
939                          u64 ofs, u64 len,
940                          struct bio *bio)
941 {
942         return rbd_do_op(rq, rbd_dev, NULL,
943                          (snapid ? snapid : CEPH_NOSNAP),
944                          CEPH_OSD_OP_READ,
945                          CEPH_OSD_FLAG_READ,
946                          2,
947                          ofs, len, bio);
948 }
949
950 /*
951  * Request sync osd read
952  */
953 static int rbd_req_sync_read(struct rbd_device *dev,
954                           struct ceph_snap_context *snapc,
955                           u64 snapid,
956                           const char *obj,
957                           u64 ofs, u64 len,
958                           char *buf)
959 {
960         return rbd_req_sync_op(dev, NULL,
961                                (snapid ? snapid : CEPH_NOSNAP),
962                                CEPH_OSD_OP_READ,
963                                CEPH_OSD_FLAG_READ,
964                                NULL,
965                                1, obj, ofs, len, buf);
966 }
967
968 /*
969  * Request sync osd read
970  */
971 static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
972                                      u64 snapid,
973                                      const char *obj)
974 {
975         struct ceph_osd_req_op *ops;
976         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
977         if (ret < 0)
978                 return ret;
979
980         ops[0].snap.snapid = snapid;
981
982         ret = rbd_req_sync_op(dev, NULL,
983                                CEPH_NOSNAP,
984                                0,
985                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
986                                ops,
987                                1, obj, 0, 0, NULL);
988
989         rbd_destroy_ops(ops);
990
991         if (ret < 0)
992                 return ret;
993
994         return ret;
995 }
996
997 /*
998  * Request sync osd read
999  */
1000 static int rbd_req_sync_exec(struct rbd_device *dev,
1001                              const char *obj,
1002                              const char *cls,
1003                              const char *method,
1004                              const char *data,
1005                              int len)
1006 {
1007         struct ceph_osd_req_op *ops;
1008         int cls_len = strlen(cls);
1009         int method_len = strlen(method);
1010         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1011                                     cls_len + method_len + len);
1012         if (ret < 0)
1013                 return ret;
1014
1015         ops[0].cls.class_name = cls;
1016         ops[0].cls.class_len = (__u8)cls_len;
1017         ops[0].cls.method_name = method;
1018         ops[0].cls.method_len = (__u8)method_len;
1019         ops[0].cls.argc = 0;
1020         ops[0].cls.indata = data;
1021         ops[0].cls.indata_len = len;
1022
1023         ret = rbd_req_sync_op(dev, NULL,
1024                                CEPH_NOSNAP,
1025                                0,
1026                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1027                                ops,
1028                                1, obj, 0, 0, NULL);
1029
1030         rbd_destroy_ops(ops);
1031
1032         dout("cls_exec returned %d\n", ret);
1033         return ret;
1034 }
1035
1036 /*
1037  * block device queue callback
1038  */
1039 static void rbd_rq_fn(struct request_queue *q)
1040 {
1041         struct rbd_device *rbd_dev = q->queuedata;
1042         struct request *rq;
1043         struct bio_pair *bp = NULL;
1044
1045         rq = blk_fetch_request(q);
1046
1047         while (1) {
1048                 struct bio *bio;
1049                 struct bio *rq_bio, *next_bio = NULL;
1050                 bool do_write;
1051                 int size, op_size = 0;
1052                 u64 ofs;
1053
1054                 /* peek at request from block layer */
1055                 if (!rq)
1056                         break;
1057
1058                 dout("fetched request\n");
1059
1060                 /* filter out block requests we don't understand */
1061                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1062                         __blk_end_request_all(rq, 0);
1063                         goto next;
1064                 }
1065
1066                 /* deduce our operation (read, write) */
1067                 do_write = (rq_data_dir(rq) == WRITE);
1068
1069                 size = blk_rq_bytes(rq);
1070                 ofs = blk_rq_pos(rq) * 512ULL;
1071                 rq_bio = rq->bio;
1072                 if (do_write && rbd_dev->read_only) {
1073                         __blk_end_request_all(rq, -EROFS);
1074                         goto next;
1075                 }
1076
1077                 spin_unlock_irq(q->queue_lock);
1078
1079                 dout("%s 0x%x bytes at 0x%llx\n",
1080                      do_write ? "write" : "read",
1081                      size, blk_rq_pos(rq) * 512ULL);
1082
1083                 do {
1084                         /* a bio clone to be passed down to OSD req */
1085                         dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1086                         op_size = rbd_get_segment(&rbd_dev->header,
1087                                                   rbd_dev->header.block_name,
1088                                                   ofs, size,
1089                                                   NULL, NULL);
1090                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1091                                               op_size, GFP_ATOMIC);
1092                         if (!bio) {
1093                                 spin_lock_irq(q->queue_lock);
1094                                 __blk_end_request_all(rq, -ENOMEM);
1095                                 goto next;
1096                         }
1097
1098                         /* init OSD command: write or read */
1099                         if (do_write)
1100                                 rbd_req_write(rq, rbd_dev,
1101                                               rbd_dev->header.snapc,
1102                                               ofs,
1103                                               op_size, bio);
1104                         else
1105                                 rbd_req_read(rq, rbd_dev,
1106                                              cur_snap_id(rbd_dev),
1107                                              ofs,
1108                                              op_size, bio);
1109
1110                         size -= op_size;
1111                         ofs += op_size;
1112
1113                         rq_bio = next_bio;
1114                 } while (size > 0);
1115
1116                 if (bp)
1117                         bio_pair_release(bp);
1118
1119                 spin_lock_irq(q->queue_lock);
1120 next:
1121                 rq = blk_fetch_request(q);
1122         }
1123 }
1124
1125 /*
1126  * a queue callback. Makes sure that we don't create a bio that spans across
1127  * multiple osd objects. One exception would be with a single page bios,
1128  * which we handle later at bio_chain_clone
1129  */
1130 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1131                           struct bio_vec *bvec)
1132 {
1133         struct rbd_device *rbd_dev = q->queuedata;
1134         unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1135         sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1136         unsigned int bio_sectors = bmd->bi_size >> 9;
1137         int max;
1138
1139         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1140                                  + bio_sectors)) << 9;
1141         if (max < 0)
1142                 max = 0; /* bio_add cannot handle a negative return */
1143         if (max <= bvec->bv_len && bio_sectors == 0)
1144                 return bvec->bv_len;
1145         return max;
1146 }
1147
1148 static void rbd_free_disk(struct rbd_device *rbd_dev)
1149 {
1150         struct gendisk *disk = rbd_dev->disk;
1151
1152         if (!disk)
1153                 return;
1154
1155         rbd_header_free(&rbd_dev->header);
1156
1157         if (disk->flags & GENHD_FL_UP)
1158                 del_gendisk(disk);
1159         if (disk->queue)
1160                 blk_cleanup_queue(disk->queue);
1161         put_disk(disk);
1162 }
1163
1164 /*
1165  * reload the ondisk the header 
1166  */
1167 static int rbd_read_header(struct rbd_device *rbd_dev,
1168                            struct rbd_image_header *header)
1169 {
1170         ssize_t rc;
1171         struct rbd_image_header_ondisk *dh;
1172         int snap_count = 0;
1173         u64 snap_names_len = 0;
1174
1175         while (1) {
1176                 int len = sizeof(*dh) +
1177                           snap_count * sizeof(struct rbd_image_snap_ondisk) +
1178                           snap_names_len;
1179
1180                 rc = -ENOMEM;
1181                 dh = kmalloc(len, GFP_KERNEL);
1182                 if (!dh)
1183                         return -ENOMEM;
1184
1185                 rc = rbd_req_sync_read(rbd_dev,
1186                                        NULL, CEPH_NOSNAP,
1187                                        rbd_dev->obj_md_name,
1188                                        0, len,
1189                                        (char *)dh);
1190                 if (rc < 0)
1191                         goto out_dh;
1192
1193                 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1194                 if (rc < 0)
1195                         goto out_dh;
1196
1197                 if (snap_count != header->total_snaps) {
1198                         snap_count = header->total_snaps;
1199                         snap_names_len = header->snap_names_len;
1200                         rbd_header_free(header);
1201                         kfree(dh);
1202                         continue;
1203                 }
1204                 break;
1205         }
1206
1207 out_dh:
1208         kfree(dh);
1209         return rc;
1210 }
1211
1212 /*
1213  * create a snapshot
1214  */
1215 static int rbd_header_add_snap(struct rbd_device *dev,
1216                                const char *snap_name,
1217                                gfp_t gfp_flags)
1218 {
1219         int name_len = strlen(snap_name);
1220         u64 new_snapid;
1221         int ret;
1222         void *data, *data_start, *data_end;
1223
1224         /* we should create a snapshot only if we're pointing at the head */
1225         if (dev->cur_snap)
1226                 return -EINVAL;
1227
1228         ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1229                                       &new_snapid);
1230         dout("created snapid=%lld\n", new_snapid);
1231         if (ret < 0)
1232                 return ret;
1233
1234         data = kmalloc(name_len + 16, gfp_flags);
1235         if (!data)
1236                 return -ENOMEM;
1237
1238         data_start = data;
1239         data_end = data + name_len + 16;
1240
1241         ceph_encode_string_safe(&data, data_end, snap_name, name_len, bad);
1242         ceph_encode_64_safe(&data, data_end, new_snapid, bad);
1243
1244         ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1245                                 data_start, data - data_start);
1246
1247         kfree(data_start);
1248
1249         if (ret < 0)
1250                 return ret;
1251
1252         dev->header.snapc->seq =  new_snapid;
1253
1254         return 0;
1255 bad:
1256         return -ERANGE;
1257 }
1258
1259 /*
1260  * only read the first part of the ondisk header, without the snaps info
1261  */
1262 static int rbd_update_snaps(struct rbd_device *rbd_dev)
1263 {
1264         int ret;
1265         struct rbd_image_header h;
1266         u64 snap_seq;
1267
1268         ret = rbd_read_header(rbd_dev, &h);
1269         if (ret < 0)
1270                 return ret;
1271
1272         down_write(&rbd_dev->header.snap_rwsem);
1273
1274         snap_seq = rbd_dev->header.snapc->seq;
1275
1276         kfree(rbd_dev->header.snapc);
1277         kfree(rbd_dev->header.snap_names);
1278         kfree(rbd_dev->header.snap_sizes);
1279
1280         rbd_dev->header.total_snaps = h.total_snaps;
1281         rbd_dev->header.snapc = h.snapc;
1282         rbd_dev->header.snap_names = h.snap_names;
1283         rbd_dev->header.snap_sizes = h.snap_sizes;
1284         rbd_dev->header.snapc->seq = snap_seq;
1285
1286         up_write(&rbd_dev->header.snap_rwsem);
1287
1288         return 0;
1289 }
1290
1291 static int rbd_init_disk(struct rbd_device *rbd_dev)
1292 {
1293         struct gendisk *disk;
1294         struct request_queue *q;
1295         int rc;
1296         u64 total_size = 0;
1297
1298         /* contact OSD, request size info about the object being mapped */
1299         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1300         if (rc)
1301                 return rc;
1302
1303         rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1304         if (rc)
1305                 return rc;
1306
1307         /* create gendisk info */
1308         rc = -ENOMEM;
1309         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1310         if (!disk)
1311                 goto out;
1312
1313         sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id);
1314         disk->major = rbd_dev->major;
1315         disk->first_minor = 0;
1316         disk->fops = &rbd_bd_ops;
1317         disk->private_data = rbd_dev;
1318
1319         /* init rq */
1320         rc = -ENOMEM;
1321         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1322         if (!q)
1323                 goto out_disk;
1324         blk_queue_merge_bvec(q, rbd_merge_bvec);
1325         disk->queue = q;
1326
1327         q->queuedata = rbd_dev;
1328
1329         rbd_dev->disk = disk;
1330         rbd_dev->q = q;
1331
1332         /* finally, announce the disk to the world */
1333         set_capacity(disk, total_size / 512ULL);
1334         add_disk(disk);
1335
1336         pr_info("%s: added with size 0x%llx\n",
1337                 disk->disk_name, (unsigned long long)total_size);
1338         return 0;
1339
1340 out_disk:
1341         put_disk(disk);
1342 out:
1343         return rc;
1344 }
1345
1346 /********************************************************************
1347  * /sys/class/rbd/
1348  *                   add        map rados objects to blkdev
1349  *                   remove     unmap rados objects
1350  *                   list       show mappings
1351  *******************************************************************/
1352
1353 static void class_rbd_release(struct class *cls)
1354 {
1355         kfree(cls);
1356 }
1357
1358 static ssize_t class_rbd_list(struct class *c,
1359                               struct class_attribute *attr,
1360                               char *data)
1361 {
1362         int n = 0;
1363         struct list_head *tmp;
1364         int max = PAGE_SIZE;
1365
1366         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1367
1368         n += snprintf(data, max,
1369                       "#id\tmajor\tclient_name\tpool\tname\tsnap\tKB\n");
1370
1371         list_for_each(tmp, &rbd_dev_list) {
1372                 struct rbd_device *rbd_dev;
1373
1374                 rbd_dev = list_entry(tmp, struct rbd_device, node);
1375                 n += snprintf(data+n, max-n,
1376                               "%d\t%d\tclient%lld\t%s\t%s\t%s\t%lld\n",
1377                               rbd_dev->id,
1378                               rbd_dev->major,
1379                               ceph_client_id(rbd_dev->client),
1380                               rbd_dev->pool_name,
1381                               rbd_dev->obj, rbd_dev->snap_name,
1382                               rbd_dev->header.image_size >> 10);
1383                 if (n == max)
1384                         break;
1385         }
1386
1387         mutex_unlock(&ctl_mutex);
1388         return n;
1389 }
1390
1391 static ssize_t class_rbd_add(struct class *c,
1392                              struct class_attribute *attr,
1393                              const char *buf, size_t count)
1394 {
1395         struct ceph_osd_client *osdc;
1396         struct rbd_device *rbd_dev;
1397         ssize_t rc = -ENOMEM;
1398         int irc, new_id = 0;
1399         struct list_head *tmp;
1400         char *mon_dev_name;
1401         char *options;
1402
1403         if (!try_module_get(THIS_MODULE))
1404                 return -ENODEV;
1405
1406         mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1407         if (!mon_dev_name)
1408                 goto err_out_mod;
1409
1410         options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1411         if (!options)
1412                 goto err_mon_dev;
1413
1414         /* new rbd_device object */
1415         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
1416         if (!rbd_dev)
1417                 goto err_out_opt;
1418
1419         /* static rbd_device initialization */
1420         spin_lock_init(&rbd_dev->lock);
1421         INIT_LIST_HEAD(&rbd_dev->node);
1422
1423         /* generate unique id: find highest unique id, add one */
1424         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1425
1426         list_for_each(tmp, &rbd_dev_list) {
1427                 struct rbd_device *rbd_dev;
1428
1429                 rbd_dev = list_entry(tmp, struct rbd_device, node);
1430                 if (rbd_dev->id >= new_id)
1431                         new_id = rbd_dev->id + 1;
1432         }
1433
1434         rbd_dev->id = new_id;
1435
1436         /* add to global list */
1437         list_add_tail(&rbd_dev->node, &rbd_dev_list);
1438
1439         /* parse add command */
1440         if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
1441                    "%" __stringify(RBD_MAX_OPT_LEN) "s "
1442                    "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
1443                    "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
1444                    "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1445                    mon_dev_name, options, rbd_dev->pool_name,
1446                    rbd_dev->obj, rbd_dev->snap_name) < 4) {
1447                 rc = -EINVAL;
1448                 goto err_out_slot;
1449         }
1450
1451         if (rbd_dev->snap_name[0] == 0)
1452                 rbd_dev->snap_name[0] = '-';
1453
1454         rbd_dev->obj_len = strlen(rbd_dev->obj);
1455         snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
1456                  rbd_dev->obj, RBD_SUFFIX);
1457
1458         /* initialize rest of new object */
1459         snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
1460         rc = rbd_get_client(rbd_dev, mon_dev_name, options);
1461         if (rc < 0)
1462                 goto err_out_slot;
1463
1464         mutex_unlock(&ctl_mutex);
1465
1466         /* pick the pool */
1467         osdc = &rbd_dev->client->osdc;
1468         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
1469         if (rc < 0)
1470                 goto err_out_client;
1471         rbd_dev->poolid = rc;
1472
1473         /* register our block device */
1474         irc = register_blkdev(0, rbd_dev->name);
1475         if (irc < 0) {
1476                 rc = irc;
1477                 goto err_out_client;
1478         }
1479         rbd_dev->major = irc;
1480
1481         /* set up and announce blkdev mapping */
1482         rc = rbd_init_disk(rbd_dev);
1483         if (rc)
1484                 goto err_out_blkdev;
1485
1486         return count;
1487
1488 err_out_blkdev:
1489         unregister_blkdev(rbd_dev->major, rbd_dev->name);
1490 err_out_client:
1491         rbd_put_client(rbd_dev);
1492         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1493 err_out_slot:
1494         list_del_init(&rbd_dev->node);
1495         mutex_unlock(&ctl_mutex);
1496
1497         kfree(rbd_dev);
1498 err_out_opt:
1499         kfree(options);
1500 err_mon_dev:
1501         kfree(mon_dev_name);
1502 err_out_mod:
1503         dout("Error adding device %s\n", buf);
1504         module_put(THIS_MODULE);
1505         return rc;
1506 }
1507
1508 static struct rbd_device *__rbd_get_dev(unsigned long id)
1509 {
1510         struct list_head *tmp;
1511         struct rbd_device *rbd_dev;
1512
1513         list_for_each(tmp, &rbd_dev_list) {
1514                 rbd_dev = list_entry(tmp, struct rbd_device, node);
1515                 if (rbd_dev->id == id)
1516                         return rbd_dev;
1517         }
1518         return NULL;
1519 }
1520
1521 static ssize_t class_rbd_remove(struct class *c,
1522                                 struct class_attribute *attr,
1523                                 const char *buf,
1524                                 size_t count)
1525 {
1526         struct rbd_device *rbd_dev = NULL;
1527         int target_id, rc;
1528         unsigned long ul;
1529
1530         rc = strict_strtoul(buf, 10, &ul);
1531         if (rc)
1532                 return rc;
1533
1534         /* convert to int; abort if we lost anything in the conversion */
1535         target_id = (int) ul;
1536         if (target_id != ul)
1537                 return -EINVAL;
1538
1539         /* remove object from list immediately */
1540         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1541
1542         rbd_dev = __rbd_get_dev(target_id);
1543         if (rbd_dev)
1544                 list_del_init(&rbd_dev->node);
1545
1546         mutex_unlock(&ctl_mutex);
1547
1548         if (!rbd_dev)
1549                 return -ENOENT;
1550
1551         rbd_put_client(rbd_dev);
1552
1553         /* clean up and free blkdev */
1554         rbd_free_disk(rbd_dev);
1555         unregister_blkdev(rbd_dev->major, rbd_dev->name);
1556         kfree(rbd_dev);
1557
1558         /* release module ref */
1559         module_put(THIS_MODULE);
1560
1561         return count;
1562 }
1563
1564 static ssize_t class_rbd_snaps_list(struct class *c,
1565                               struct class_attribute *attr,
1566                               char *data)
1567 {
1568         struct rbd_device *rbd_dev = NULL;
1569         struct list_head *tmp;
1570         struct rbd_image_header *header;
1571         int i, n = 0, max = PAGE_SIZE;
1572         int ret;
1573
1574         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1575
1576         n += snprintf(data, max, "#id\tsnap\tKB\n");
1577
1578         list_for_each(tmp, &rbd_dev_list) {
1579                 char *names, *p;
1580                 struct ceph_snap_context *snapc;
1581
1582                 rbd_dev = list_entry(tmp, struct rbd_device, node);
1583                 header = &rbd_dev->header;
1584
1585                 down_read(&header->snap_rwsem);
1586
1587                 names = header->snap_names;
1588                 snapc = header->snapc;
1589
1590                 n += snprintf(data + n, max - n, "%d\t%s\t%lld%s\n",
1591                               rbd_dev->id, RBD_SNAP_HEAD_NAME,
1592                               header->image_size >> 10,
1593                               (!rbd_dev->cur_snap ? " (*)" : ""));
1594                 if (n == max)
1595                         break;
1596
1597                 p = names;
1598                 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
1599                         n += snprintf(data + n, max - n, "%d\t%s\t%lld%s\n",
1600                               rbd_dev->id, p, header->snap_sizes[i] >> 10,
1601                               (rbd_dev->cur_snap &&
1602                                (snap_index(header, i) == rbd_dev->cur_snap) ?
1603                                " (*)" : ""));
1604                         if (n == max)
1605                                 break;
1606                 }
1607
1608                 up_read(&header->snap_rwsem);
1609         }
1610
1611
1612         ret = n;
1613         mutex_unlock(&ctl_mutex);
1614         return ret;
1615 }
1616
1617 static ssize_t class_rbd_snaps_refresh(struct class *c,
1618                                 struct class_attribute *attr,
1619                                 const char *buf,
1620                                 size_t count)
1621 {
1622         struct rbd_device *rbd_dev = NULL;
1623         int target_id, rc;
1624         unsigned long ul;
1625         int ret = count;
1626
1627         rc = strict_strtoul(buf, 10, &ul);
1628         if (rc)
1629                 return rc;
1630
1631         /* convert to int; abort if we lost anything in the conversion */
1632         target_id = (int) ul;
1633         if (target_id != ul)
1634                 return -EINVAL;
1635
1636         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1637
1638         rbd_dev = __rbd_get_dev(target_id);
1639         if (!rbd_dev) {
1640                 ret = -ENOENT;
1641                 goto done;
1642         }
1643
1644         rc = rbd_update_snaps(rbd_dev);
1645         if (rc < 0)
1646                 ret = rc;
1647
1648 done:
1649         mutex_unlock(&ctl_mutex);
1650         return ret;
1651 }
1652
1653 static ssize_t class_rbd_snap_create(struct class *c,
1654                                 struct class_attribute *attr,
1655                                 const char *buf,
1656                                 size_t count)
1657 {
1658         struct rbd_device *rbd_dev = NULL;
1659         int target_id, ret;
1660         char *name;
1661
1662         name = kmalloc(RBD_MAX_SNAP_NAME_LEN + 1, GFP_KERNEL);
1663         if (!name)
1664                 return -ENOMEM;
1665
1666         /* parse snaps add command */
1667         if (sscanf(buf, "%d "
1668                    "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1669                    &target_id,
1670                    name) != 2) {
1671                 ret = -EINVAL;
1672                 goto done;
1673         }
1674
1675         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1676
1677         rbd_dev = __rbd_get_dev(target_id);
1678         if (!rbd_dev) {
1679                 ret = -ENOENT;
1680                 goto done_unlock;
1681         }
1682
1683         ret = rbd_header_add_snap(rbd_dev,
1684                                   name, GFP_KERNEL);
1685         if (ret < 0)
1686                 goto done_unlock;
1687
1688         ret = rbd_update_snaps(rbd_dev);
1689         if (ret < 0)
1690                 goto done_unlock;
1691
1692         ret = count;
1693 done_unlock:
1694         mutex_unlock(&ctl_mutex);
1695 done:
1696         kfree(name);
1697         return ret;
1698 }
1699
1700 static ssize_t class_rbd_rollback(struct class *c,
1701                                 struct class_attribute *attr,
1702                                 const char *buf,
1703                                 size_t count)
1704 {
1705         struct rbd_device *rbd_dev = NULL;
1706         int target_id, ret;
1707         u64 snapid;
1708         char snap_name[RBD_MAX_SNAP_NAME_LEN];
1709         u64 cur_ofs;
1710         char *seg_name;
1711
1712         /* parse snaps add command */
1713         if (sscanf(buf, "%d "
1714                    "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1715                    &target_id,
1716                    snap_name) != 2) {
1717                 return -EINVAL;
1718         }
1719
1720         ret = -ENOMEM;
1721         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1722         if (!seg_name)
1723                 return ret;
1724
1725         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1726
1727         rbd_dev = __rbd_get_dev(target_id);
1728         if (!rbd_dev) {
1729                 ret = -ENOENT;
1730                 goto done_unlock;
1731         }
1732
1733         ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
1734         if (ret < 0)
1735                 goto done_unlock;
1736
1737         dout("snapid=%lld\n", snapid);
1738
1739         cur_ofs = 0;
1740         while (cur_ofs < rbd_dev->header.image_size) {
1741                 cur_ofs += rbd_get_segment(&rbd_dev->header,
1742                                            rbd_dev->obj,
1743                                            cur_ofs, (u64)-1,
1744                                            seg_name, NULL);
1745                 dout("seg_name=%s\n", seg_name);
1746
1747                 ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
1748                 if (ret < 0)
1749                         pr_warning("could not roll back obj %s err=%d\n",
1750                                    seg_name, ret);
1751         }
1752
1753         ret = rbd_update_snaps(rbd_dev);
1754         if (ret < 0)
1755                 goto done_unlock;
1756
1757         ret = count;
1758
1759 done_unlock:
1760         mutex_unlock(&ctl_mutex);
1761         kfree(seg_name);
1762
1763         return ret;
1764 }
1765
1766 static struct class_attribute class_rbd_attrs[] = {
1767         __ATTR(add,             0200, NULL, class_rbd_add),
1768         __ATTR(remove,          0200, NULL, class_rbd_remove),
1769         __ATTR(list,            0444, class_rbd_list, NULL),
1770         __ATTR(snaps_refresh,   0200, NULL, class_rbd_snaps_refresh),
1771         __ATTR(snap_create,     0200, NULL, class_rbd_snap_create),
1772         __ATTR(snaps_list,      0444, class_rbd_snaps_list, NULL),
1773         __ATTR(snap_rollback,   0200, NULL, class_rbd_rollback),
1774         __ATTR_NULL
1775 };
1776
1777 /*
1778  * create control files in sysfs
1779  * /sys/class/rbd/...
1780  */
1781 static int rbd_sysfs_init(void)
1782 {
1783         int ret = -ENOMEM;
1784
1785         class_rbd = kzalloc(sizeof(*class_rbd), GFP_KERNEL);
1786         if (!class_rbd)
1787                 goto out;
1788
1789         class_rbd->name = DRV_NAME;
1790         class_rbd->owner = THIS_MODULE;
1791         class_rbd->class_release = class_rbd_release;
1792         class_rbd->class_attrs = class_rbd_attrs;
1793
1794         ret = class_register(class_rbd);
1795         if (ret)
1796                 goto out_class;
1797         return 0;
1798
1799 out_class:
1800         kfree(class_rbd);
1801         class_rbd = NULL;
1802         pr_err(DRV_NAME ": failed to create class rbd\n");
1803 out:
1804         return ret;
1805 }
1806
1807 static void rbd_sysfs_cleanup(void)
1808 {
1809         if (class_rbd)
1810                 class_destroy(class_rbd);
1811         class_rbd = NULL;
1812 }
1813
1814 int __init rbd_init(void)
1815 {
1816         int rc;
1817
1818         rc = rbd_sysfs_init();
1819         if (rc)
1820                 return rc;
1821         spin_lock_init(&node_lock);
1822         pr_info("loaded " DRV_NAME_LONG "\n");
1823         return 0;
1824 }
1825
1826 void __exit rbd_exit(void)
1827 {
1828         rbd_sysfs_cleanup();
1829 }
1830
1831 module_init(rbd_init);
1832 module_exit(rbd_exit);
1833
1834 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
1835 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
1836 MODULE_DESCRIPTION("rados block device");
1837
1838 /* following authorship retained from original osdblk.c */
1839 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
1840
1841 MODULE_LICENSE("GPL");