]> git.openfabrics.org - ~shefty/rdma-dev.git/blob - drivers/block/rbd.c
rbd: replace the rbd sysfs interface
[~shefty/rdma-dev.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34
35 #include <linux/kernel.h>
36 #include <linux/device.h>
37 #include <linux/module.h>
38 #include <linux/fs.h>
39 #include <linux/blkdev.h>
40
41 #include "rbd_types.h"
42
43 #define DRV_NAME "rbd"
44 #define DRV_NAME_LONG "rbd (rados block device)"
45
46 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
47
48 #define RBD_MAX_MD_NAME_LEN     (96 + sizeof(RBD_SUFFIX))
49 #define RBD_MAX_POOL_NAME_LEN   64
50 #define RBD_MAX_SNAP_NAME_LEN   32
51 #define RBD_MAX_OPT_LEN         1024
52
53 #define RBD_SNAP_HEAD_NAME      "-"
54
55 #define DEV_NAME_LEN            32
56
57 /*
58  * block device image metadata (in-memory version)
59  */
60 struct rbd_image_header {
61         u64 image_size;
62         char block_name[32];
63         __u8 obj_order;
64         __u8 crypt_type;
65         __u8 comp_type;
66         struct rw_semaphore snap_rwsem;
67         struct ceph_snap_context *snapc;
68         size_t snap_names_len;
69         u64 snap_seq;
70         u32 total_snaps;
71
72         char *snap_names;
73         u64 *snap_sizes;
74 };
75
76 /*
77  * an instance of the client.  multiple devices may share a client.
78  */
79 struct rbd_client {
80         struct ceph_client      *client;
81         struct kref             kref;
82         struct list_head        node;
83 };
84
85 /*
86  * a single io request
87  */
88 struct rbd_request {
89         struct request          *rq;            /* blk layer request */
90         struct bio              *bio;           /* cloned bio */
91         struct page             **pages;        /* list of used pages */
92         u64                     len;
93 };
94
95 struct rbd_snap {
96         struct  device          dev;
97         const char              *name;
98         size_t                  size;
99         struct list_head        node;
100         u64                     id;
101 };
102
103 /*
104  * a single device
105  */
106 struct rbd_device {
107         int                     id;             /* blkdev unique id */
108
109         int                     major;          /* blkdev assigned major */
110         struct gendisk          *disk;          /* blkdev's gendisk and rq */
111         struct request_queue    *q;
112
113         struct ceph_client      *client;
114         struct rbd_client       *rbd_client;
115
116         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
117
118         spinlock_t              lock;           /* queue lock */
119
120         struct rbd_image_header header;
121         char                    obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
122         int                     obj_len;
123         char                    obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
124         char                    pool_name[RBD_MAX_POOL_NAME_LEN];
125         int                     poolid;
126
127         char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
128         u32 cur_snap;   /* index+1 of current snapshot within snap context
129                            0 - for the head */
130         int read_only;
131
132         struct list_head        node;
133
134         /* list of snapshots */
135         struct list_head        snaps;
136
137         /* sysfs related */
138         struct device           dev;
139 };
140
141 static struct bus_type rbd_bus_type = {
142         .name           = "rbd",
143 };
144
145 static spinlock_t node_lock;      /* protects client get/put */
146
147 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
148 static LIST_HEAD(rbd_dev_list);    /* devices */
149 static LIST_HEAD(rbd_client_list);      /* clients */
150
151 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
152 static void rbd_dev_release(struct device *dev);
153 static ssize_t rbd_snap_rollback(struct device *dev,
154                                  struct device_attribute *attr,
155                                  const char *buf,
156                                  size_t size);
157 static ssize_t rbd_snap_add(struct device *dev,
158                             struct device_attribute *attr,
159                             const char *buf,
160                             size_t count);
161 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
162                                   struct rbd_snap *snap);;
163
164
165 static struct rbd_device *dev_to_rbd(struct device *dev)
166 {
167         return container_of(dev, struct rbd_device, dev);
168 }
169
170 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
171 {
172         return get_device(&rbd_dev->dev);
173 }
174
175 static void rbd_put_dev(struct rbd_device *rbd_dev)
176 {
177         put_device(&rbd_dev->dev);
178 }
179
180 static int rbd_open(struct block_device *bdev, fmode_t mode)
181 {
182         struct gendisk *disk = bdev->bd_disk;
183         struct rbd_device *rbd_dev = disk->private_data;
184
185         rbd_get_dev(rbd_dev);
186
187         set_device_ro(bdev, rbd_dev->read_only);
188
189         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
190                 return -EROFS;
191
192         return 0;
193 }
194
195 static int rbd_release(struct gendisk *disk, fmode_t mode)
196 {
197         struct rbd_device *rbd_dev = disk->private_data;
198
199         rbd_put_dev(rbd_dev);
200
201         return 0;
202 }
203
204 static const struct block_device_operations rbd_bd_ops = {
205         .owner                  = THIS_MODULE,
206         .open                   = rbd_open,
207         .release                = rbd_release,
208 };
209
210 /*
211  * Initialize an rbd client instance.
212  * We own *opt.
213  */
214 static struct rbd_client *rbd_client_create(struct ceph_options *opt)
215 {
216         struct rbd_client *rbdc;
217         int ret = -ENOMEM;
218
219         dout("rbd_client_create\n");
220         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
221         if (!rbdc)
222                 goto out_opt;
223
224         kref_init(&rbdc->kref);
225         INIT_LIST_HEAD(&rbdc->node);
226
227         rbdc->client = ceph_create_client(opt, rbdc);
228         if (IS_ERR(rbdc->client))
229                 goto out_rbdc;
230         opt = NULL; /* Now rbdc->client is responsible for opt */
231
232         ret = ceph_open_session(rbdc->client);
233         if (ret < 0)
234                 goto out_err;
235
236         spin_lock(&node_lock);
237         list_add_tail(&rbdc->node, &rbd_client_list);
238         spin_unlock(&node_lock);
239
240         dout("rbd_client_create created %p\n", rbdc);
241         return rbdc;
242
243 out_err:
244         ceph_destroy_client(rbdc->client);
245 out_rbdc:
246         kfree(rbdc);
247 out_opt:
248         if (opt)
249                 ceph_destroy_options(opt);
250         return ERR_PTR(ret);
251 }
252
253 /*
254  * Find a ceph client with specific addr and configuration.
255  */
256 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
257 {
258         struct rbd_client *client_node;
259
260         if (opt->flags & CEPH_OPT_NOSHARE)
261                 return NULL;
262
263         list_for_each_entry(client_node, &rbd_client_list, node)
264                 if (ceph_compare_options(opt, client_node->client) == 0)
265                         return client_node;
266         return NULL;
267 }
268
269 /*
270  * Get a ceph client with specific addr and configuration, if one does
271  * not exist create it.
272  */
273 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
274                           char *options)
275 {
276         struct rbd_client *rbdc;
277         struct ceph_options *opt;
278         int ret;
279
280         ret = ceph_parse_options(&opt, options, mon_addr,
281                                  mon_addr + strlen(mon_addr), NULL, NULL);
282         if (ret < 0)
283                 return ret;
284
285         spin_lock(&node_lock);
286         rbdc = __rbd_client_find(opt);
287         if (rbdc) {
288                 ceph_destroy_options(opt);
289
290                 /* using an existing client */
291                 kref_get(&rbdc->kref);
292                 rbd_dev->rbd_client = rbdc;
293                 rbd_dev->client = rbdc->client;
294                 spin_unlock(&node_lock);
295                 return 0;
296         }
297         spin_unlock(&node_lock);
298
299         rbdc = rbd_client_create(opt);
300         if (IS_ERR(rbdc))
301                 return PTR_ERR(rbdc);
302
303         rbd_dev->rbd_client = rbdc;
304         rbd_dev->client = rbdc->client;
305         return 0;
306 }
307
308 /*
309  * Destroy ceph client
310  */
311 static void rbd_client_release(struct kref *kref)
312 {
313         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
314
315         dout("rbd_release_client %p\n", rbdc);
316         spin_lock(&node_lock);
317         list_del(&rbdc->node);
318         spin_unlock(&node_lock);
319
320         ceph_destroy_client(rbdc->client);
321         kfree(rbdc);
322 }
323
324 /*
325  * Drop reference to ceph client node. If it's not referenced anymore, release
326  * it.
327  */
328 static void rbd_put_client(struct rbd_device *rbd_dev)
329 {
330         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
331         rbd_dev->rbd_client = NULL;
332         rbd_dev->client = NULL;
333 }
334
335
336 /*
337  * Create a new header structure, translate header format from the on-disk
338  * header.
339  */
340 static int rbd_header_from_disk(struct rbd_image_header *header,
341                                  struct rbd_image_header_ondisk *ondisk,
342                                  int allocated_snaps,
343                                  gfp_t gfp_flags)
344 {
345         int i;
346         u32 snap_count = le32_to_cpu(ondisk->snap_count);
347         int ret = -ENOMEM;
348
349         init_rwsem(&header->snap_rwsem);
350         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
351         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
352                                 snap_count *
353                                  sizeof(struct rbd_image_snap_ondisk),
354                                 gfp_flags);
355         if (!header->snapc)
356                 return -ENOMEM;
357         if (snap_count) {
358                 header->snap_names = kmalloc(header->snap_names_len,
359                                              GFP_KERNEL);
360                 if (!header->snap_names)
361                         goto err_snapc;
362                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
363                                              GFP_KERNEL);
364                 if (!header->snap_sizes)
365                         goto err_names;
366         } else {
367                 header->snap_names = NULL;
368                 header->snap_sizes = NULL;
369         }
370         memcpy(header->block_name, ondisk->block_name,
371                sizeof(ondisk->block_name));
372
373         header->image_size = le64_to_cpu(ondisk->image_size);
374         header->obj_order = ondisk->options.order;
375         header->crypt_type = ondisk->options.crypt_type;
376         header->comp_type = ondisk->options.comp_type;
377
378         atomic_set(&header->snapc->nref, 1);
379         header->snap_seq = le64_to_cpu(ondisk->snap_seq);
380         header->snapc->num_snaps = snap_count;
381         header->total_snaps = snap_count;
382
383         if (snap_count &&
384             allocated_snaps == snap_count) {
385                 for (i = 0; i < snap_count; i++) {
386                         header->snapc->snaps[i] =
387                                 le64_to_cpu(ondisk->snaps[i].id);
388                         header->snap_sizes[i] =
389                                 le64_to_cpu(ondisk->snaps[i].image_size);
390                 }
391
392                 /* copy snapshot names */
393                 memcpy(header->snap_names, &ondisk->snaps[i],
394                         header->snap_names_len);
395         }
396
397         return 0;
398
399 err_names:
400         kfree(header->snap_names);
401 err_snapc:
402         kfree(header->snapc);
403         return ret;
404 }
405
406 static int snap_index(struct rbd_image_header *header, int snap_num)
407 {
408         return header->total_snaps - snap_num;
409 }
410
411 static u64 cur_snap_id(struct rbd_device *rbd_dev)
412 {
413         struct rbd_image_header *header = &rbd_dev->header;
414
415         if (!rbd_dev->cur_snap)
416                 return 0;
417
418         return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
419 }
420
421 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
422                         u64 *seq, u64 *size)
423 {
424         int i;
425         char *p = header->snap_names;
426
427         for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
428                 if (strcmp(snap_name, p) == 0)
429                         break;
430         }
431         if (i == header->total_snaps)
432                 return -ENOENT;
433         if (seq)
434                 *seq = header->snapc->snaps[i];
435
436         if (size)
437                 *size = header->snap_sizes[i];
438
439         return i;
440 }
441
442 static int rbd_header_set_snap(struct rbd_device *dev,
443                                const char *snap_name,
444                                u64 *size)
445 {
446         struct rbd_image_header *header = &dev->header;
447         struct ceph_snap_context *snapc = header->snapc;
448         int ret = -ENOENT;
449
450         down_write(&header->snap_rwsem);
451
452         if (!snap_name ||
453             !*snap_name ||
454             strcmp(snap_name, "-") == 0 ||
455             strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
456                 if (header->total_snaps)
457                         snapc->seq = header->snap_seq;
458                 else
459                         snapc->seq = 0;
460                 dev->cur_snap = 0;
461                 dev->read_only = 0;
462                 if (size)
463                         *size = header->image_size;
464         } else {
465                 ret = snap_by_name(header, snap_name, &snapc->seq, size);
466                 if (ret < 0)
467                         goto done;
468
469                 dev->cur_snap = header->total_snaps - ret;
470                 dev->read_only = 1;
471         }
472
473         ret = 0;
474 done:
475         up_write(&header->snap_rwsem);
476         return ret;
477 }
478
479 static void rbd_header_free(struct rbd_image_header *header)
480 {
481         kfree(header->snapc);
482         kfree(header->snap_names);
483         kfree(header->snap_sizes);
484 }
485
486 /*
487  * get the actual striped segment name, offset and length
488  */
489 static u64 rbd_get_segment(struct rbd_image_header *header,
490                            const char *block_name,
491                            u64 ofs, u64 len,
492                            char *seg_name, u64 *segofs)
493 {
494         u64 seg = ofs >> header->obj_order;
495
496         if (seg_name)
497                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
498                          "%s.%012llx", block_name, seg);
499
500         ofs = ofs & ((1 << header->obj_order) - 1);
501         len = min_t(u64, len, (1 << header->obj_order) - ofs);
502
503         if (segofs)
504                 *segofs = ofs;
505
506         return len;
507 }
508
509 /*
510  * bio helpers
511  */
512
513 static void bio_chain_put(struct bio *chain)
514 {
515         struct bio *tmp;
516
517         while (chain) {
518                 tmp = chain;
519                 chain = chain->bi_next;
520                 bio_put(tmp);
521         }
522 }
523
524 /*
525  * zeros a bio chain, starting at specific offset
526  */
527 static void zero_bio_chain(struct bio *chain, int start_ofs)
528 {
529         struct bio_vec *bv;
530         unsigned long flags;
531         void *buf;
532         int i;
533         int pos = 0;
534
535         while (chain) {
536                 bio_for_each_segment(bv, chain, i) {
537                         if (pos + bv->bv_len > start_ofs) {
538                                 int remainder = max(start_ofs - pos, 0);
539                                 buf = bvec_kmap_irq(bv, &flags);
540                                 memset(buf + remainder, 0,
541                                        bv->bv_len - remainder);
542                                 bvec_kunmap_irq(buf, &flags);
543                         }
544                         pos += bv->bv_len;
545                 }
546
547                 chain = chain->bi_next;
548         }
549 }
550
551 /*
552  * bio_chain_clone - clone a chain of bios up to a certain length.
553  * might return a bio_pair that will need to be released.
554  */
555 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
556                                    struct bio_pair **bp,
557                                    int len, gfp_t gfpmask)
558 {
559         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
560         int total = 0;
561
562         if (*bp) {
563                 bio_pair_release(*bp);
564                 *bp = NULL;
565         }
566
567         while (old_chain && (total < len)) {
568                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
569                 if (!tmp)
570                         goto err_out;
571
572                 if (total + old_chain->bi_size > len) {
573                         struct bio_pair *bp;
574
575                         /*
576                          * this split can only happen with a single paged bio,
577                          * split_bio will BUG_ON if this is not the case
578                          */
579                         dout("bio_chain_clone split! total=%d remaining=%d"
580                              "bi_size=%d\n",
581                              (int)total, (int)len-total,
582                              (int)old_chain->bi_size);
583
584                         /* split the bio. We'll release it either in the next
585                            call, or it will have to be released outside */
586                         bp = bio_split(old_chain, (len - total) / 512ULL);
587                         if (!bp)
588                                 goto err_out;
589
590                         __bio_clone(tmp, &bp->bio1);
591
592                         *next = &bp->bio2;
593                 } else {
594                         __bio_clone(tmp, old_chain);
595                         *next = old_chain->bi_next;
596                 }
597
598                 tmp->bi_bdev = NULL;
599                 gfpmask &= ~__GFP_WAIT;
600                 tmp->bi_next = NULL;
601
602                 if (!new_chain) {
603                         new_chain = tail = tmp;
604                 } else {
605                         tail->bi_next = tmp;
606                         tail = tmp;
607                 }
608                 old_chain = old_chain->bi_next;
609
610                 total += tmp->bi_size;
611         }
612
613         BUG_ON(total < len);
614
615         if (tail)
616                 tail->bi_next = NULL;
617
618         *old = old_chain;
619
620         return new_chain;
621
622 err_out:
623         dout("bio_chain_clone with err\n");
624         bio_chain_put(new_chain);
625         return NULL;
626 }
627
628 /*
629  * helpers for osd request op vectors.
630  */
631 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
632                             int num_ops,
633                             int opcode,
634                             u32 payload_len)
635 {
636         *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
637                        GFP_NOIO);
638         if (!*ops)
639                 return -ENOMEM;
640         (*ops)[0].op = opcode;
641         /*
642          * op extent offset and length will be set later on
643          * in calc_raw_layout()
644          */
645         (*ops)[0].payload_len = payload_len;
646         return 0;
647 }
648
649 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
650 {
651         kfree(ops);
652 }
653
654 /*
655  * Send ceph osd request
656  */
657 static int rbd_do_request(struct request *rq,
658                           struct rbd_device *dev,
659                           struct ceph_snap_context *snapc,
660                           u64 snapid,
661                           const char *obj, u64 ofs, u64 len,
662                           struct bio *bio,
663                           struct page **pages,
664                           int num_pages,
665                           int flags,
666                           struct ceph_osd_req_op *ops,
667                           int num_reply,
668                           void (*rbd_cb)(struct ceph_osd_request *req,
669                                          struct ceph_msg *msg))
670 {
671         struct ceph_osd_request *req;
672         struct ceph_file_layout *layout;
673         int ret;
674         u64 bno;
675         struct timespec mtime = CURRENT_TIME;
676         struct rbd_request *req_data;
677         struct ceph_osd_request_head *reqhead;
678         struct rbd_image_header *header = &dev->header;
679
680         ret = -ENOMEM;
681         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
682         if (!req_data)
683                 goto done;
684
685         dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs);
686
687         down_read(&header->snap_rwsem);
688
689         req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
690                                       snapc,
691                                       ops,
692                                       false,
693                                       GFP_NOIO, pages, bio);
694         if (IS_ERR(req)) {
695                 up_read(&header->snap_rwsem);
696                 ret = PTR_ERR(req);
697                 goto done_pages;
698         }
699
700         req->r_callback = rbd_cb;
701
702         req_data->rq = rq;
703         req_data->bio = bio;
704         req_data->pages = pages;
705         req_data->len = len;
706
707         req->r_priv = req_data;
708
709         reqhead = req->r_request->front.iov_base;
710         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
711
712         strncpy(req->r_oid, obj, sizeof(req->r_oid));
713         req->r_oid_len = strlen(req->r_oid);
714
715         layout = &req->r_file_layout;
716         memset(layout, 0, sizeof(*layout));
717         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
718         layout->fl_stripe_count = cpu_to_le32(1);
719         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
720         layout->fl_pg_preferred = cpu_to_le32(-1);
721         layout->fl_pg_pool = cpu_to_le32(dev->poolid);
722         ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
723                              ofs, &len, &bno, req, ops);
724
725         ceph_osdc_build_request(req, ofs, &len,
726                                 ops,
727                                 snapc,
728                                 &mtime,
729                                 req->r_oid, req->r_oid_len);
730         up_read(&header->snap_rwsem);
731
732         ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
733         if (ret < 0)
734                 goto done_err;
735
736         if (!rbd_cb) {
737                 ret = ceph_osdc_wait_request(&dev->client->osdc, req);
738                 ceph_osdc_put_request(req);
739         }
740         return ret;
741
742 done_err:
743         bio_chain_put(req_data->bio);
744         ceph_osdc_put_request(req);
745 done_pages:
746         kfree(req_data);
747 done:
748         if (rq)
749                 blk_end_request(rq, ret, len);
750         return ret;
751 }
752
753 /*
754  * Ceph osd op callback
755  */
756 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
757 {
758         struct rbd_request *req_data = req->r_priv;
759         struct ceph_osd_reply_head *replyhead;
760         struct ceph_osd_op *op;
761         __s32 rc;
762         u64 bytes;
763         int read_op;
764
765         /* parse reply */
766         replyhead = msg->front.iov_base;
767         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
768         op = (void *)(replyhead + 1);
769         rc = le32_to_cpu(replyhead->result);
770         bytes = le64_to_cpu(op->extent.length);
771         read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
772
773         dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
774
775         if (rc == -ENOENT && read_op) {
776                 zero_bio_chain(req_data->bio, 0);
777                 rc = 0;
778         } else if (rc == 0 && read_op && bytes < req_data->len) {
779                 zero_bio_chain(req_data->bio, bytes);
780                 bytes = req_data->len;
781         }
782
783         blk_end_request(req_data->rq, rc, bytes);
784
785         if (req_data->bio)
786                 bio_chain_put(req_data->bio);
787
788         ceph_osdc_put_request(req);
789         kfree(req_data);
790 }
791
792 /*
793  * Do a synchronous ceph osd operation
794  */
795 static int rbd_req_sync_op(struct rbd_device *dev,
796                            struct ceph_snap_context *snapc,
797                            u64 snapid,
798                            int opcode,
799                            int flags,
800                            struct ceph_osd_req_op *orig_ops,
801                            int num_reply,
802                            const char *obj,
803                            u64 ofs, u64 len,
804                            char *buf)
805 {
806         int ret;
807         struct page **pages;
808         int num_pages;
809         struct ceph_osd_req_op *ops = orig_ops;
810         u32 payload_len;
811
812         num_pages = calc_pages_for(ofs , len);
813         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
814         if (IS_ERR(pages))
815                 return PTR_ERR(pages);
816
817         if (!orig_ops) {
818                 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
819                 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
820                 if (ret < 0)
821                         goto done;
822
823                 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
824                         ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
825                         if (ret < 0)
826                                 goto done_ops;
827                 }
828         }
829
830         ret = rbd_do_request(NULL, dev, snapc, snapid,
831                           obj, ofs, len, NULL,
832                           pages, num_pages,
833                           flags,
834                           ops,
835                           2,
836                           NULL);
837         if (ret < 0)
838                 goto done_ops;
839
840         if ((flags & CEPH_OSD_FLAG_READ) && buf)
841                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
842
843 done_ops:
844         if (!orig_ops)
845                 rbd_destroy_ops(ops);
846 done:
847         ceph_release_page_vector(pages, num_pages);
848         return ret;
849 }
850
851 /*
852  * Do an asynchronous ceph osd operation
853  */
854 static int rbd_do_op(struct request *rq,
855                      struct rbd_device *rbd_dev ,
856                      struct ceph_snap_context *snapc,
857                      u64 snapid,
858                      int opcode, int flags, int num_reply,
859                      u64 ofs, u64 len,
860                      struct bio *bio)
861 {
862         char *seg_name;
863         u64 seg_ofs;
864         u64 seg_len;
865         int ret;
866         struct ceph_osd_req_op *ops;
867         u32 payload_len;
868
869         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
870         if (!seg_name)
871                 return -ENOMEM;
872
873         seg_len = rbd_get_segment(&rbd_dev->header,
874                                   rbd_dev->header.block_name,
875                                   ofs, len,
876                                   seg_name, &seg_ofs);
877
878         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
879
880         ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
881         if (ret < 0)
882                 goto done;
883
884         /* we've taken care of segment sizes earlier when we
885            cloned the bios. We should never have a segment
886            truncated at this point */
887         BUG_ON(seg_len < len);
888
889         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
890                              seg_name, seg_ofs, seg_len,
891                              bio,
892                              NULL, 0,
893                              flags,
894                              ops,
895                              num_reply,
896                              rbd_req_cb);
897 done:
898         kfree(seg_name);
899         return ret;
900 }
901
902 /*
903  * Request async osd write
904  */
905 static int rbd_req_write(struct request *rq,
906                          struct rbd_device *rbd_dev,
907                          struct ceph_snap_context *snapc,
908                          u64 ofs, u64 len,
909                          struct bio *bio)
910 {
911         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
912                          CEPH_OSD_OP_WRITE,
913                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
914                          2,
915                          ofs, len, bio);
916 }
917
918 /*
919  * Request async osd read
920  */
921 static int rbd_req_read(struct request *rq,
922                          struct rbd_device *rbd_dev,
923                          u64 snapid,
924                          u64 ofs, u64 len,
925                          struct bio *bio)
926 {
927         return rbd_do_op(rq, rbd_dev, NULL,
928                          (snapid ? snapid : CEPH_NOSNAP),
929                          CEPH_OSD_OP_READ,
930                          CEPH_OSD_FLAG_READ,
931                          2,
932                          ofs, len, bio);
933 }
934
935 /*
936  * Request sync osd read
937  */
938 static int rbd_req_sync_read(struct rbd_device *dev,
939                           struct ceph_snap_context *snapc,
940                           u64 snapid,
941                           const char *obj,
942                           u64 ofs, u64 len,
943                           char *buf)
944 {
945         return rbd_req_sync_op(dev, NULL,
946                                (snapid ? snapid : CEPH_NOSNAP),
947                                CEPH_OSD_OP_READ,
948                                CEPH_OSD_FLAG_READ,
949                                NULL,
950                                1, obj, ofs, len, buf);
951 }
952
953 /*
954  * Request sync osd read
955  */
956 static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
957                                      u64 snapid,
958                                      const char *obj)
959 {
960         struct ceph_osd_req_op *ops;
961         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
962         if (ret < 0)
963                 return ret;
964
965         ops[0].snap.snapid = snapid;
966
967         ret = rbd_req_sync_op(dev, NULL,
968                                CEPH_NOSNAP,
969                                0,
970                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
971                                ops,
972                                1, obj, 0, 0, NULL);
973
974         rbd_destroy_ops(ops);
975
976         if (ret < 0)
977                 return ret;
978
979         return ret;
980 }
981
982 /*
983  * Request sync osd read
984  */
985 static int rbd_req_sync_exec(struct rbd_device *dev,
986                              const char *obj,
987                              const char *cls,
988                              const char *method,
989                              const char *data,
990                              int len)
991 {
992         struct ceph_osd_req_op *ops;
993         int cls_len = strlen(cls);
994         int method_len = strlen(method);
995         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
996                                     cls_len + method_len + len);
997         if (ret < 0)
998                 return ret;
999
1000         ops[0].cls.class_name = cls;
1001         ops[0].cls.class_len = (__u8)cls_len;
1002         ops[0].cls.method_name = method;
1003         ops[0].cls.method_len = (__u8)method_len;
1004         ops[0].cls.argc = 0;
1005         ops[0].cls.indata = data;
1006         ops[0].cls.indata_len = len;
1007
1008         ret = rbd_req_sync_op(dev, NULL,
1009                                CEPH_NOSNAP,
1010                                0,
1011                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1012                                ops,
1013                                1, obj, 0, 0, NULL);
1014
1015         rbd_destroy_ops(ops);
1016
1017         dout("cls_exec returned %d\n", ret);
1018         return ret;
1019 }
1020
1021 /*
1022  * block device queue callback
1023  */
1024 static void rbd_rq_fn(struct request_queue *q)
1025 {
1026         struct rbd_device *rbd_dev = q->queuedata;
1027         struct request *rq;
1028         struct bio_pair *bp = NULL;
1029
1030         rq = blk_fetch_request(q);
1031
1032         while (1) {
1033                 struct bio *bio;
1034                 struct bio *rq_bio, *next_bio = NULL;
1035                 bool do_write;
1036                 int size, op_size = 0;
1037                 u64 ofs;
1038
1039                 /* peek at request from block layer */
1040                 if (!rq)
1041                         break;
1042
1043                 dout("fetched request\n");
1044
1045                 /* filter out block requests we don't understand */
1046                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1047                         __blk_end_request_all(rq, 0);
1048                         goto next;
1049                 }
1050
1051                 /* deduce our operation (read, write) */
1052                 do_write = (rq_data_dir(rq) == WRITE);
1053
1054                 size = blk_rq_bytes(rq);
1055                 ofs = blk_rq_pos(rq) * 512ULL;
1056                 rq_bio = rq->bio;
1057                 if (do_write && rbd_dev->read_only) {
1058                         __blk_end_request_all(rq, -EROFS);
1059                         goto next;
1060                 }
1061
1062                 spin_unlock_irq(q->queue_lock);
1063
1064                 dout("%s 0x%x bytes at 0x%llx\n",
1065                      do_write ? "write" : "read",
1066                      size, blk_rq_pos(rq) * 512ULL);
1067
1068                 do {
1069                         /* a bio clone to be passed down to OSD req */
1070                         dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1071                         op_size = rbd_get_segment(&rbd_dev->header,
1072                                                   rbd_dev->header.block_name,
1073                                                   ofs, size,
1074                                                   NULL, NULL);
1075                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1076                                               op_size, GFP_ATOMIC);
1077                         if (!bio) {
1078                                 spin_lock_irq(q->queue_lock);
1079                                 __blk_end_request_all(rq, -ENOMEM);
1080                                 goto next;
1081                         }
1082
1083                         /* init OSD command: write or read */
1084                         if (do_write)
1085                                 rbd_req_write(rq, rbd_dev,
1086                                               rbd_dev->header.snapc,
1087                                               ofs,
1088                                               op_size, bio);
1089                         else
1090                                 rbd_req_read(rq, rbd_dev,
1091                                              cur_snap_id(rbd_dev),
1092                                              ofs,
1093                                              op_size, bio);
1094
1095                         size -= op_size;
1096                         ofs += op_size;
1097
1098                         rq_bio = next_bio;
1099                 } while (size > 0);
1100
1101                 if (bp)
1102                         bio_pair_release(bp);
1103
1104                 spin_lock_irq(q->queue_lock);
1105 next:
1106                 rq = blk_fetch_request(q);
1107         }
1108 }
1109
1110 /*
1111  * a queue callback. Makes sure that we don't create a bio that spans across
1112  * multiple osd objects. One exception would be with a single page bios,
1113  * which we handle later at bio_chain_clone
1114  */
1115 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1116                           struct bio_vec *bvec)
1117 {
1118         struct rbd_device *rbd_dev = q->queuedata;
1119         unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1120         sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1121         unsigned int bio_sectors = bmd->bi_size >> 9;
1122         int max;
1123
1124         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1125                                  + bio_sectors)) << 9;
1126         if (max < 0)
1127                 max = 0; /* bio_add cannot handle a negative return */
1128         if (max <= bvec->bv_len && bio_sectors == 0)
1129                 return bvec->bv_len;
1130         return max;
1131 }
1132
1133 static void rbd_free_disk(struct rbd_device *rbd_dev)
1134 {
1135         struct gendisk *disk = rbd_dev->disk;
1136
1137         if (!disk)
1138                 return;
1139
1140         rbd_header_free(&rbd_dev->header);
1141
1142         if (disk->flags & GENHD_FL_UP)
1143                 del_gendisk(disk);
1144         if (disk->queue)
1145                 blk_cleanup_queue(disk->queue);
1146         put_disk(disk);
1147 }
1148
1149 /*
1150  * reload the ondisk the header 
1151  */
1152 static int rbd_read_header(struct rbd_device *rbd_dev,
1153                            struct rbd_image_header *header)
1154 {
1155         ssize_t rc;
1156         struct rbd_image_header_ondisk *dh;
1157         int snap_count = 0;
1158         u64 snap_names_len = 0;
1159
1160         while (1) {
1161                 int len = sizeof(*dh) +
1162                           snap_count * sizeof(struct rbd_image_snap_ondisk) +
1163                           snap_names_len;
1164
1165                 rc = -ENOMEM;
1166                 dh = kmalloc(len, GFP_KERNEL);
1167                 if (!dh)
1168                         return -ENOMEM;
1169
1170                 rc = rbd_req_sync_read(rbd_dev,
1171                                        NULL, CEPH_NOSNAP,
1172                                        rbd_dev->obj_md_name,
1173                                        0, len,
1174                                        (char *)dh);
1175                 if (rc < 0)
1176                         goto out_dh;
1177
1178                 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1179                 if (rc < 0)
1180                         goto out_dh;
1181
1182                 if (snap_count != header->total_snaps) {
1183                         snap_count = header->total_snaps;
1184                         snap_names_len = header->snap_names_len;
1185                         rbd_header_free(header);
1186                         kfree(dh);
1187                         continue;
1188                 }
1189                 break;
1190         }
1191
1192 out_dh:
1193         kfree(dh);
1194         return rc;
1195 }
1196
1197 /*
1198  * create a snapshot
1199  */
1200 static int rbd_header_add_snap(struct rbd_device *dev,
1201                                const char *snap_name,
1202                                gfp_t gfp_flags)
1203 {
1204         int name_len = strlen(snap_name);
1205         u64 new_snapid;
1206         int ret;
1207         void *data, *data_start, *data_end;
1208
1209         /* we should create a snapshot only if we're pointing at the head */
1210         if (dev->cur_snap)
1211                 return -EINVAL;
1212
1213         ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1214                                       &new_snapid);
1215         dout("created snapid=%lld\n", new_snapid);
1216         if (ret < 0)
1217                 return ret;
1218
1219         data = kmalloc(name_len + 16, gfp_flags);
1220         if (!data)
1221                 return -ENOMEM;
1222
1223         data_start = data;
1224         data_end = data + name_len + 16;
1225
1226         ceph_encode_string_safe(&data, data_end, snap_name, name_len, bad);
1227         ceph_encode_64_safe(&data, data_end, new_snapid, bad);
1228
1229         ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1230                                 data_start, data - data_start);
1231
1232         kfree(data_start);
1233
1234         if (ret < 0)
1235                 return ret;
1236
1237         dev->header.snapc->seq =  new_snapid;
1238
1239         return 0;
1240 bad:
1241         return -ERANGE;
1242 }
1243
1244 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1245 {
1246         struct rbd_snap *snap;
1247
1248         while (!list_empty(&rbd_dev->snaps)) {
1249                 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1250                 __rbd_remove_snap_dev(rbd_dev, snap);
1251         }
1252 }
1253
1254 /*
1255  * only read the first part of the ondisk header, without the snaps info
1256  */
1257 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1258 {
1259         int ret;
1260         struct rbd_image_header h;
1261         u64 snap_seq;
1262
1263         ret = rbd_read_header(rbd_dev, &h);
1264         if (ret < 0)
1265                 return ret;
1266
1267         down_write(&rbd_dev->header.snap_rwsem);
1268
1269         snap_seq = rbd_dev->header.snapc->seq;
1270
1271         kfree(rbd_dev->header.snapc);
1272         kfree(rbd_dev->header.snap_names);
1273         kfree(rbd_dev->header.snap_sizes);
1274
1275         rbd_dev->header.total_snaps = h.total_snaps;
1276         rbd_dev->header.snapc = h.snapc;
1277         rbd_dev->header.snap_names = h.snap_names;
1278         rbd_dev->header.snap_names_len = h.snap_names_len;
1279         rbd_dev->header.snap_sizes = h.snap_sizes;
1280         rbd_dev->header.snapc->seq = snap_seq;
1281
1282         ret = __rbd_init_snaps_header(rbd_dev);
1283
1284         up_write(&rbd_dev->header.snap_rwsem);
1285
1286         return ret;
1287 }
1288
1289 static int rbd_init_disk(struct rbd_device *rbd_dev)
1290 {
1291         struct gendisk *disk;
1292         struct request_queue *q;
1293         int rc;
1294         u64 total_size = 0;
1295
1296         /* contact OSD, request size info about the object being mapped */
1297         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1298         if (rc)
1299                 return rc;
1300
1301         /* no need to lock here, as rbd_dev is not registered yet */
1302         rc = __rbd_init_snaps_header(rbd_dev);
1303         if (rc)
1304                 return rc;
1305
1306         rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1307         if (rc)
1308                 return rc;
1309
1310         /* create gendisk info */
1311         rc = -ENOMEM;
1312         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1313         if (!disk)
1314                 goto out;
1315
1316         sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id);
1317         disk->major = rbd_dev->major;
1318         disk->first_minor = 0;
1319         disk->fops = &rbd_bd_ops;
1320         disk->private_data = rbd_dev;
1321
1322         /* init rq */
1323         rc = -ENOMEM;
1324         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1325         if (!q)
1326                 goto out_disk;
1327         blk_queue_merge_bvec(q, rbd_merge_bvec);
1328         disk->queue = q;
1329
1330         q->queuedata = rbd_dev;
1331
1332         rbd_dev->disk = disk;
1333         rbd_dev->q = q;
1334
1335         /* finally, announce the disk to the world */
1336         set_capacity(disk, total_size / 512ULL);
1337         add_disk(disk);
1338
1339         pr_info("%s: added with size 0x%llx\n",
1340                 disk->disk_name, (unsigned long long)total_size);
1341         return 0;
1342
1343 out_disk:
1344         put_disk(disk);
1345 out:
1346         return rc;
1347 }
1348
1349 /*
1350   sysfs
1351 */
1352
1353 static ssize_t rbd_size_show(struct device *dev,
1354                              struct device_attribute *attr, char *buf)
1355 {
1356         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1357
1358         return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1359 }
1360
1361 static ssize_t rbd_major_show(struct device *dev,
1362                               struct device_attribute *attr, char *buf)
1363 {
1364         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1365
1366         return sprintf(buf, "%d\n", rbd_dev->major);
1367 }
1368
1369 static ssize_t rbd_client_id_show(struct device *dev,
1370                                   struct device_attribute *attr, char *buf)
1371 {
1372         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1373
1374         return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
1375 }
1376
1377 static ssize_t rbd_pool_show(struct device *dev,
1378                              struct device_attribute *attr, char *buf)
1379 {
1380         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1381
1382         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1383 }
1384
1385 static ssize_t rbd_name_show(struct device *dev,
1386                              struct device_attribute *attr, char *buf)
1387 {
1388         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1389
1390         return sprintf(buf, "%s\n", rbd_dev->obj);
1391 }
1392
1393 static ssize_t rbd_snap_show(struct device *dev,
1394                              struct device_attribute *attr,
1395                              char *buf)
1396 {
1397         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1398
1399         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1400 }
1401
1402 static ssize_t rbd_image_refresh(struct device *dev,
1403                                  struct device_attribute *attr,
1404                                  const char *buf,
1405                                  size_t size)
1406 {
1407         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1408         int rc;
1409         int ret = size;
1410
1411         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1412
1413         rc = __rbd_update_snaps(rbd_dev);
1414         if (rc < 0)
1415                 ret = rc;
1416
1417         mutex_unlock(&ctl_mutex);
1418         return ret;
1419 }
1420
1421 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1422 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1423 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1424 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1425 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1426 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1427 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1428 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1429 static DEVICE_ATTR(rollback_snap, S_IWUSR, NULL, rbd_snap_rollback);
1430
1431 static struct attribute *rbd_attrs[] = {
1432         &dev_attr_size.attr,
1433         &dev_attr_major.attr,
1434         &dev_attr_client_id.attr,
1435         &dev_attr_pool.attr,
1436         &dev_attr_name.attr,
1437         &dev_attr_current_snap.attr,
1438         &dev_attr_refresh.attr,
1439         &dev_attr_create_snap.attr,
1440         &dev_attr_rollback_snap.attr,
1441         NULL
1442 };
1443
1444 static struct attribute_group rbd_attr_group = {
1445         .attrs = rbd_attrs,
1446 };
1447
1448 static const struct attribute_group *rbd_attr_groups[] = {
1449         &rbd_attr_group,
1450         NULL
1451 };
1452
1453 static void rbd_sysfs_dev_release(struct device *dev)
1454 {
1455 }
1456
1457 static struct device_type rbd_device_type = {
1458         .name           = "rbd",
1459         .groups         = rbd_attr_groups,
1460         .release        = rbd_sysfs_dev_release,
1461 };
1462
1463
1464 /*
1465   sysfs - snapshots
1466 */
1467
1468 static ssize_t rbd_snap_size_show(struct device *dev,
1469                                   struct device_attribute *attr,
1470                                   char *buf)
1471 {
1472         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1473
1474         return sprintf(buf, "%lld\n", (long long)snap->size);
1475 }
1476
1477 static ssize_t rbd_snap_id_show(struct device *dev,
1478                                 struct device_attribute *attr,
1479                                 char *buf)
1480 {
1481         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1482
1483         return sprintf(buf, "%lld\n", (long long)snap->id);
1484 }
1485
1486 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1487 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1488
1489 static struct attribute *rbd_snap_attrs[] = {
1490         &dev_attr_snap_size.attr,
1491         &dev_attr_snap_id.attr,
1492         NULL,
1493 };
1494
1495 static struct attribute_group rbd_snap_attr_group = {
1496         .attrs = rbd_snap_attrs,
1497 };
1498
1499 static void rbd_snap_dev_release(struct device *dev)
1500 {
1501         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1502         kfree(snap->name);
1503         kfree(snap);
1504 }
1505
1506 static const struct attribute_group *rbd_snap_attr_groups[] = {
1507         &rbd_snap_attr_group,
1508         NULL
1509 };
1510
1511 static struct device_type rbd_snap_device_type = {
1512         .groups         = rbd_snap_attr_groups,
1513         .release        = rbd_snap_dev_release,
1514 };
1515
1516 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1517                                   struct rbd_snap *snap)
1518 {
1519         list_del(&snap->node);
1520         device_unregister(&snap->dev);
1521 }
1522
1523 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1524                                   struct rbd_snap *snap,
1525                                   struct device *parent)
1526 {
1527         struct device *dev = &snap->dev;
1528         int ret;
1529
1530         dev->type = &rbd_snap_device_type;
1531         dev->parent = parent;
1532         dev->release = rbd_snap_dev_release;
1533         dev_set_name(dev, "snap_%s", snap->name);
1534         ret = device_register(dev);
1535
1536         return ret;
1537 }
1538
1539 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1540                               int i, const char *name,
1541                               struct rbd_snap **snapp)
1542 {
1543         int ret;
1544         struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1545         if (!snap)
1546                 return -ENOMEM;
1547         snap->name = kstrdup(name, GFP_KERNEL);
1548         snap->size = rbd_dev->header.snap_sizes[i];
1549         snap->id = rbd_dev->header.snapc->snaps[i];
1550         if (device_is_registered(&rbd_dev->dev)) {
1551                 ret = rbd_register_snap_dev(rbd_dev, snap,
1552                                              &rbd_dev->dev);
1553                 if (ret < 0)
1554                         goto err;
1555         }
1556         *snapp = snap;
1557         return 0;
1558 err:
1559         kfree(snap->name);
1560         kfree(snap);
1561         return ret;
1562 }
1563
1564 /*
1565  * search for the previous snap in a null delimited string list
1566  */
1567 const char *rbd_prev_snap_name(const char *name, const char *start)
1568 {
1569         if (name < start + 2)
1570                 return NULL;
1571
1572         name -= 2;
1573         while (*name) {
1574                 if (name == start)
1575                         return start;
1576                 name--;
1577         }
1578         return name + 1;
1579 }
1580
1581 /*
1582  * compare the old list of snapshots that we have to what's in the header
1583  * and update it accordingly. Note that the header holds the snapshots
1584  * in a reverse order (from newest to oldest) and we need to go from
1585  * older to new so that we don't get a duplicate snap name when
1586  * doing the process (e.g., removed snapshot and recreated a new
1587  * one with the same name.
1588  */
1589 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
1590 {
1591         const char *name, *first_name;
1592         int i = rbd_dev->header.total_snaps;
1593         struct rbd_snap *snap, *old_snap = NULL;
1594         int ret;
1595         struct list_head *p, *n;
1596
1597         first_name = rbd_dev->header.snap_names;
1598         name = first_name + rbd_dev->header.snap_names_len;
1599
1600         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
1601                 u64 cur_id;
1602
1603                 old_snap = list_entry(p, struct rbd_snap, node);
1604
1605                 if (i)
1606                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
1607
1608                 if (!i || old_snap->id < cur_id) {
1609                         /* old_snap->id was skipped, thus was removed */
1610                         __rbd_remove_snap_dev(rbd_dev, old_snap);
1611                         continue;
1612                 }
1613                 if (old_snap->id == cur_id) {
1614                         /* we have this snapshot already */
1615                         i--;
1616                         name = rbd_prev_snap_name(name, first_name);
1617                         continue;
1618                 }
1619                 for (; i > 0;
1620                      i--, name = rbd_prev_snap_name(name, first_name)) {
1621                         if (!name) {
1622                                 WARN_ON(1);
1623                                 return -EINVAL;
1624                         }
1625                         cur_id = rbd_dev->header.snapc->snaps[i];
1626                         /* snapshot removal? handle it above */
1627                         if (cur_id >= old_snap->id)
1628                                 break;
1629                         /* a new snapshot */
1630                         ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
1631                         if (ret < 0)
1632                                 return ret;
1633
1634                         /* note that we add it backward so using n and not p */
1635                         list_add(&snap->node, n);
1636                         p = &snap->node;
1637                 }
1638         }
1639         /* we're done going over the old snap list, just add what's left */
1640         for (; i > 0; i--) {
1641                 name = rbd_prev_snap_name(name, first_name);
1642                 if (!name) {
1643                         WARN_ON(1);
1644                         return -EINVAL;
1645                 }
1646                 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
1647                 if (ret < 0)
1648                         return ret;
1649                 list_add(&snap->node, &rbd_dev->snaps);
1650         }
1651
1652         return 0;
1653 }
1654
1655
1656 static void rbd_root_dev_release(struct device *dev)
1657 {
1658 }
1659
1660 static struct device rbd_root_dev = {
1661         .init_name =    "rbd",
1662         .release =      rbd_root_dev_release,
1663 };
1664
1665 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
1666 {
1667         int ret = -ENOMEM;
1668         struct device *dev;
1669         struct rbd_snap *snap;
1670
1671         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1672         dev = &rbd_dev->dev;
1673
1674         dev->bus = &rbd_bus_type;
1675         dev->type = &rbd_device_type;
1676         dev->parent = &rbd_root_dev;
1677         dev->release = rbd_dev_release;
1678         dev_set_name(dev, "%d", rbd_dev->id);
1679         ret = device_register(dev);
1680         if (ret < 0)
1681                 goto done_free;
1682
1683         list_for_each_entry(snap, &rbd_dev->snaps, node) {
1684                 ret = rbd_register_snap_dev(rbd_dev, snap,
1685                                              &rbd_dev->dev);
1686                 if (ret < 0)
1687                         break;
1688         }
1689
1690         mutex_unlock(&ctl_mutex);
1691         return 0;
1692 done_free:
1693         mutex_unlock(&ctl_mutex);
1694         return ret;
1695 }
1696
1697 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
1698 {
1699         device_unregister(&rbd_dev->dev);
1700 }
1701
1702 static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count)
1703 {
1704         struct ceph_osd_client *osdc;
1705         struct rbd_device *rbd_dev;
1706         ssize_t rc = -ENOMEM;
1707         int irc, new_id = 0;
1708         struct list_head *tmp;
1709         char *mon_dev_name;
1710         char *options;
1711
1712         if (!try_module_get(THIS_MODULE))
1713                 return -ENODEV;
1714
1715         mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1716         if (!mon_dev_name)
1717                 goto err_out_mod;
1718
1719         options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1720         if (!options)
1721                 goto err_mon_dev;
1722
1723         /* new rbd_device object */
1724         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
1725         if (!rbd_dev)
1726                 goto err_out_opt;
1727
1728         /* static rbd_device initialization */
1729         spin_lock_init(&rbd_dev->lock);
1730         INIT_LIST_HEAD(&rbd_dev->node);
1731         INIT_LIST_HEAD(&rbd_dev->snaps);
1732
1733         /* generate unique id: find highest unique id, add one */
1734         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1735
1736         list_for_each(tmp, &rbd_dev_list) {
1737                 struct rbd_device *rbd_dev;
1738
1739                 rbd_dev = list_entry(tmp, struct rbd_device, node);
1740                 if (rbd_dev->id >= new_id)
1741                         new_id = rbd_dev->id + 1;
1742         }
1743
1744         rbd_dev->id = new_id;
1745
1746         /* add to global list */
1747         list_add_tail(&rbd_dev->node, &rbd_dev_list);
1748
1749         /* parse add command */
1750         if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
1751                    "%" __stringify(RBD_MAX_OPT_LEN) "s "
1752                    "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
1753                    "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
1754                    "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1755                    mon_dev_name, options, rbd_dev->pool_name,
1756                    rbd_dev->obj, rbd_dev->snap_name) < 4) {
1757                 rc = -EINVAL;
1758                 goto err_out_slot;
1759         }
1760
1761         if (rbd_dev->snap_name[0] == 0)
1762                 rbd_dev->snap_name[0] = '-';
1763
1764         rbd_dev->obj_len = strlen(rbd_dev->obj);
1765         snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
1766                  rbd_dev->obj, RBD_SUFFIX);
1767
1768         /* initialize rest of new object */
1769         snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
1770         rc = rbd_get_client(rbd_dev, mon_dev_name, options);
1771         if (rc < 0)
1772                 goto err_out_slot;
1773
1774         mutex_unlock(&ctl_mutex);
1775
1776         /* pick the pool */
1777         osdc = &rbd_dev->client->osdc;
1778         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
1779         if (rc < 0)
1780                 goto err_out_client;
1781         rbd_dev->poolid = rc;
1782
1783         /* register our block device */
1784         irc = register_blkdev(0, rbd_dev->name);
1785         if (irc < 0) {
1786                 rc = irc;
1787                 goto err_out_client;
1788         }
1789         rbd_dev->major = irc;
1790
1791         rc = rbd_bus_add_dev(rbd_dev);
1792         if (rc)
1793                 goto err_out_disk;
1794         /* set up and announce blkdev mapping */
1795         rc = rbd_init_disk(rbd_dev);
1796         if (rc)
1797                 goto err_out_blkdev;
1798
1799         return count;
1800
1801 err_out_blkdev:
1802         unregister_blkdev(rbd_dev->major, rbd_dev->name);
1803 err_out_disk:
1804         rbd_free_disk(rbd_dev);
1805 err_out_client:
1806         rbd_put_client(rbd_dev);
1807         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1808 err_out_slot:
1809         list_del_init(&rbd_dev->node);
1810         mutex_unlock(&ctl_mutex);
1811
1812         kfree(rbd_dev);
1813 err_out_opt:
1814         kfree(options);
1815 err_mon_dev:
1816         kfree(mon_dev_name);
1817 err_out_mod:
1818         dout("Error adding device %s\n", buf);
1819         module_put(THIS_MODULE);
1820         return rc;
1821 }
1822
1823 static struct rbd_device *__rbd_get_dev(unsigned long id)
1824 {
1825         struct list_head *tmp;
1826         struct rbd_device *rbd_dev;
1827
1828         list_for_each(tmp, &rbd_dev_list) {
1829                 rbd_dev = list_entry(tmp, struct rbd_device, node);
1830                 if (rbd_dev->id == id)
1831                         return rbd_dev;
1832         }
1833         return NULL;
1834 }
1835
1836 static void rbd_dev_release(struct device *dev)
1837 {
1838         struct rbd_device *rbd_dev =
1839                         container_of(dev, struct rbd_device, dev);
1840
1841         rbd_put_client(rbd_dev);
1842
1843         /* clean up and free blkdev */
1844         rbd_free_disk(rbd_dev);
1845         unregister_blkdev(rbd_dev->major, rbd_dev->name);
1846         kfree(rbd_dev);
1847
1848         /* release module ref */
1849         module_put(THIS_MODULE);
1850 }
1851
1852 static ssize_t rbd_remove(struct bus_type *bus,
1853                           const char *buf,
1854                           size_t count)
1855 {
1856         struct rbd_device *rbd_dev = NULL;
1857         int target_id, rc;
1858         unsigned long ul;
1859         int ret = count;
1860
1861         rc = strict_strtoul(buf, 10, &ul);
1862         if (rc)
1863                 return rc;
1864
1865         /* convert to int; abort if we lost anything in the conversion */
1866         target_id = (int) ul;
1867         if (target_id != ul)
1868                 return -EINVAL;
1869
1870         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1871
1872         rbd_dev = __rbd_get_dev(target_id);
1873         if (!rbd_dev) {
1874                 ret = -ENOENT;
1875                 goto done;
1876         }
1877
1878         list_del_init(&rbd_dev->node);
1879
1880         __rbd_remove_all_snaps(rbd_dev);
1881         rbd_bus_del_dev(rbd_dev);
1882
1883 done:
1884         mutex_unlock(&ctl_mutex);
1885         return ret;
1886 }
1887
1888 static ssize_t rbd_snap_add(struct device *dev,
1889                             struct device_attribute *attr,
1890                             const char *buf,
1891                             size_t count)
1892 {
1893         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1894         int ret;
1895         char *name = kmalloc(count + 1, GFP_KERNEL);
1896         if (!name)
1897                 return -ENOMEM;
1898
1899         snprintf(name, count, "%s", buf);
1900
1901         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1902
1903         ret = rbd_header_add_snap(rbd_dev,
1904                                   name, GFP_KERNEL);
1905         if (ret < 0)
1906                 goto done_unlock;
1907
1908         ret = __rbd_update_snaps(rbd_dev);
1909         if (ret < 0)
1910                 goto done_unlock;
1911
1912         ret = count;
1913 done_unlock:
1914         mutex_unlock(&ctl_mutex);
1915         kfree(name);
1916         return ret;
1917 }
1918
1919 static ssize_t rbd_snap_rollback(struct device *dev,
1920                                  struct device_attribute *attr,
1921                                  const char *buf,
1922                                  size_t count)
1923 {
1924         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1925         int ret;
1926         u64 snapid;
1927         u64 cur_ofs;
1928         char *seg_name = NULL;
1929         char *snap_name = kmalloc(count + 1, GFP_KERNEL);
1930         ret = -ENOMEM;
1931         if (!snap_name)
1932                 return ret;
1933
1934         /* parse snaps add command */
1935         snprintf(snap_name, count, "%s", buf);
1936         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1937         if (!seg_name)
1938                 goto done;
1939
1940         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1941
1942         ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
1943         if (ret < 0)
1944                 goto done_unlock;
1945
1946         dout("snapid=%lld\n", snapid);
1947
1948         cur_ofs = 0;
1949         while (cur_ofs < rbd_dev->header.image_size) {
1950                 cur_ofs += rbd_get_segment(&rbd_dev->header,
1951                                            rbd_dev->obj,
1952                                            cur_ofs, (u64)-1,
1953                                            seg_name, NULL);
1954                 dout("seg_name=%s\n", seg_name);
1955
1956                 ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
1957                 if (ret < 0)
1958                         pr_warning("could not roll back obj %s err=%d\n",
1959                                    seg_name, ret);
1960         }
1961
1962         ret = __rbd_update_snaps(rbd_dev);
1963         if (ret < 0)
1964                 goto done_unlock;
1965
1966         ret = count;
1967
1968 done_unlock:
1969         mutex_unlock(&ctl_mutex);
1970 done:
1971         kfree(seg_name);
1972         kfree(snap_name);
1973
1974         return ret;
1975 }
1976
1977 static struct bus_attribute rbd_bus_attrs[] = {
1978         __ATTR(add, S_IWUSR, NULL, rbd_add),
1979         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
1980         __ATTR_NULL
1981 };
1982
1983 /*
1984  * create control files in sysfs
1985  * /sys/bus/rbd/...
1986  */
1987 static int rbd_sysfs_init(void)
1988 {
1989         int ret;
1990
1991         rbd_bus_type.bus_attrs = rbd_bus_attrs;
1992
1993         ret = bus_register(&rbd_bus_type);
1994          if (ret < 0)
1995                 return ret;
1996
1997         ret = device_register(&rbd_root_dev);
1998
1999         return ret;
2000 }
2001
2002 static void rbd_sysfs_cleanup(void)
2003 {
2004         device_unregister(&rbd_root_dev);
2005         bus_unregister(&rbd_bus_type);
2006 }
2007
2008 int __init rbd_init(void)
2009 {
2010         int rc;
2011
2012         rc = rbd_sysfs_init();
2013         if (rc)
2014                 return rc;
2015         spin_lock_init(&node_lock);
2016         pr_info("loaded " DRV_NAME_LONG "\n");
2017         return 0;
2018 }
2019
2020 void __exit rbd_exit(void)
2021 {
2022         rbd_sysfs_cleanup();
2023 }
2024
2025 module_init(rbd_init);
2026 module_exit(rbd_exit);
2027
2028 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2029 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2030 MODULE_DESCRIPTION("rados block device");
2031
2032 /* following authorship retained from original osdblk.c */
2033 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2034
2035 MODULE_LICENSE("GPL");