]> git.openfabrics.org - ~shefty/rdma-dev.git/blob - drivers/md/dm-raid1.c
dm raid1: dont use map_context
[~shefty/rdma-dev.git] / drivers / md / dm-raid1.c
1 /*
2  * Copyright (C) 2003 Sistina Software Limited.
3  * Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved.
4  *
5  * This file is released under the GPL.
6  */
7
8 #include "dm-bio-record.h"
9
10 #include <linux/init.h>
11 #include <linux/mempool.h>
12 #include <linux/module.h>
13 #include <linux/pagemap.h>
14 #include <linux/slab.h>
15 #include <linux/workqueue.h>
16 #include <linux/device-mapper.h>
17 #include <linux/dm-io.h>
18 #include <linux/dm-dirty-log.h>
19 #include <linux/dm-kcopyd.h>
20 #include <linux/dm-region-hash.h>
21
22 #define DM_MSG_PREFIX "raid1"
23
24 #define MAX_RECOVERY 1  /* Maximum number of regions recovered in parallel. */
25
26 #define DM_RAID1_HANDLE_ERRORS 0x01
27 #define errors_handled(p)       ((p)->features & DM_RAID1_HANDLE_ERRORS)
28
29 static DECLARE_WAIT_QUEUE_HEAD(_kmirrord_recovery_stopped);
30
31 /*-----------------------------------------------------------------
32  * Mirror set structures.
33  *---------------------------------------------------------------*/
34 enum dm_raid1_error {
35         DM_RAID1_WRITE_ERROR,
36         DM_RAID1_FLUSH_ERROR,
37         DM_RAID1_SYNC_ERROR,
38         DM_RAID1_READ_ERROR
39 };
40
41 struct mirror {
42         struct mirror_set *ms;
43         atomic_t error_count;
44         unsigned long error_type;
45         struct dm_dev *dev;
46         sector_t offset;
47 };
48
49 struct mirror_set {
50         struct dm_target *ti;
51         struct list_head list;
52
53         uint64_t features;
54
55         spinlock_t lock;        /* protects the lists */
56         struct bio_list reads;
57         struct bio_list writes;
58         struct bio_list failures;
59         struct bio_list holds;  /* bios are waiting until suspend */
60
61         struct dm_region_hash *rh;
62         struct dm_kcopyd_client *kcopyd_client;
63         struct dm_io_client *io_client;
64
65         /* recovery */
66         region_t nr_regions;
67         int in_sync;
68         int log_failure;
69         int leg_failure;
70         atomic_t suspend;
71
72         atomic_t default_mirror;        /* Default mirror */
73
74         struct workqueue_struct *kmirrord_wq;
75         struct work_struct kmirrord_work;
76         struct timer_list timer;
77         unsigned long timer_pending;
78
79         struct work_struct trigger_event;
80
81         unsigned nr_mirrors;
82         struct mirror mirror[0];
83 };
84
85 static void wakeup_mirrord(void *context)
86 {
87         struct mirror_set *ms = context;
88
89         queue_work(ms->kmirrord_wq, &ms->kmirrord_work);
90 }
91
92 static void delayed_wake_fn(unsigned long data)
93 {
94         struct mirror_set *ms = (struct mirror_set *) data;
95
96         clear_bit(0, &ms->timer_pending);
97         wakeup_mirrord(ms);
98 }
99
100 static void delayed_wake(struct mirror_set *ms)
101 {
102         if (test_and_set_bit(0, &ms->timer_pending))
103                 return;
104
105         ms->timer.expires = jiffies + HZ / 5;
106         ms->timer.data = (unsigned long) ms;
107         ms->timer.function = delayed_wake_fn;
108         add_timer(&ms->timer);
109 }
110
111 static void wakeup_all_recovery_waiters(void *context)
112 {
113         wake_up_all(&_kmirrord_recovery_stopped);
114 }
115
116 static void queue_bio(struct mirror_set *ms, struct bio *bio, int rw)
117 {
118         unsigned long flags;
119         int should_wake = 0;
120         struct bio_list *bl;
121
122         bl = (rw == WRITE) ? &ms->writes : &ms->reads;
123         spin_lock_irqsave(&ms->lock, flags);
124         should_wake = !(bl->head);
125         bio_list_add(bl, bio);
126         spin_unlock_irqrestore(&ms->lock, flags);
127
128         if (should_wake)
129                 wakeup_mirrord(ms);
130 }
131
132 static void dispatch_bios(void *context, struct bio_list *bio_list)
133 {
134         struct mirror_set *ms = context;
135         struct bio *bio;
136
137         while ((bio = bio_list_pop(bio_list)))
138                 queue_bio(ms, bio, WRITE);
139 }
140
141 struct dm_raid1_bio_record {
142         struct mirror *m;
143         /* if details->bi_bdev == NULL, details were not saved */
144         struct dm_bio_details details;
145         region_t write_region;
146 };
147
148 /*
149  * Every mirror should look like this one.
150  */
151 #define DEFAULT_MIRROR 0
152
153 /*
154  * This is yucky.  We squirrel the mirror struct away inside
155  * bi_next for read/write buffers.  This is safe since the bh
156  * doesn't get submitted to the lower levels of block layer.
157  */
158 static struct mirror *bio_get_m(struct bio *bio)
159 {
160         return (struct mirror *) bio->bi_next;
161 }
162
163 static void bio_set_m(struct bio *bio, struct mirror *m)
164 {
165         bio->bi_next = (struct bio *) m;
166 }
167
168 static struct mirror *get_default_mirror(struct mirror_set *ms)
169 {
170         return &ms->mirror[atomic_read(&ms->default_mirror)];
171 }
172
173 static void set_default_mirror(struct mirror *m)
174 {
175         struct mirror_set *ms = m->ms;
176         struct mirror *m0 = &(ms->mirror[0]);
177
178         atomic_set(&ms->default_mirror, m - m0);
179 }
180
181 static struct mirror *get_valid_mirror(struct mirror_set *ms)
182 {
183         struct mirror *m;
184
185         for (m = ms->mirror; m < ms->mirror + ms->nr_mirrors; m++)
186                 if (!atomic_read(&m->error_count))
187                         return m;
188
189         return NULL;
190 }
191
192 /* fail_mirror
193  * @m: mirror device to fail
194  * @error_type: one of the enum's, DM_RAID1_*_ERROR
195  *
196  * If errors are being handled, record the type of
197  * error encountered for this device.  If this type
198  * of error has already been recorded, we can return;
199  * otherwise, we must signal userspace by triggering
200  * an event.  Additionally, if the device is the
201  * primary device, we must choose a new primary, but
202  * only if the mirror is in-sync.
203  *
204  * This function must not block.
205  */
206 static void fail_mirror(struct mirror *m, enum dm_raid1_error error_type)
207 {
208         struct mirror_set *ms = m->ms;
209         struct mirror *new;
210
211         ms->leg_failure = 1;
212
213         /*
214          * error_count is used for nothing more than a
215          * simple way to tell if a device has encountered
216          * errors.
217          */
218         atomic_inc(&m->error_count);
219
220         if (test_and_set_bit(error_type, &m->error_type))
221                 return;
222
223         if (!errors_handled(ms))
224                 return;
225
226         if (m != get_default_mirror(ms))
227                 goto out;
228
229         if (!ms->in_sync) {
230                 /*
231                  * Better to issue requests to same failing device
232                  * than to risk returning corrupt data.
233                  */
234                 DMERR("Primary mirror (%s) failed while out-of-sync: "
235                       "Reads may fail.", m->dev->name);
236                 goto out;
237         }
238
239         new = get_valid_mirror(ms);
240         if (new)
241                 set_default_mirror(new);
242         else
243                 DMWARN("All sides of mirror have failed.");
244
245 out:
246         schedule_work(&ms->trigger_event);
247 }
248
249 static int mirror_flush(struct dm_target *ti)
250 {
251         struct mirror_set *ms = ti->private;
252         unsigned long error_bits;
253
254         unsigned int i;
255         struct dm_io_region io[ms->nr_mirrors];
256         struct mirror *m;
257         struct dm_io_request io_req = {
258                 .bi_rw = WRITE_FLUSH,
259                 .mem.type = DM_IO_KMEM,
260                 .mem.ptr.addr = NULL,
261                 .client = ms->io_client,
262         };
263
264         for (i = 0, m = ms->mirror; i < ms->nr_mirrors; i++, m++) {
265                 io[i].bdev = m->dev->bdev;
266                 io[i].sector = 0;
267                 io[i].count = 0;
268         }
269
270         error_bits = -1;
271         dm_io(&io_req, ms->nr_mirrors, io, &error_bits);
272         if (unlikely(error_bits != 0)) {
273                 for (i = 0; i < ms->nr_mirrors; i++)
274                         if (test_bit(i, &error_bits))
275                                 fail_mirror(ms->mirror + i,
276                                             DM_RAID1_FLUSH_ERROR);
277                 return -EIO;
278         }
279
280         return 0;
281 }
282
283 /*-----------------------------------------------------------------
284  * Recovery.
285  *
286  * When a mirror is first activated we may find that some regions
287  * are in the no-sync state.  We have to recover these by
288  * recopying from the default mirror to all the others.
289  *---------------------------------------------------------------*/
290 static void recovery_complete(int read_err, unsigned long write_err,
291                               void *context)
292 {
293         struct dm_region *reg = context;
294         struct mirror_set *ms = dm_rh_region_context(reg);
295         int m, bit = 0;
296
297         if (read_err) {
298                 /* Read error means the failure of default mirror. */
299                 DMERR_LIMIT("Unable to read primary mirror during recovery");
300                 fail_mirror(get_default_mirror(ms), DM_RAID1_SYNC_ERROR);
301         }
302
303         if (write_err) {
304                 DMERR_LIMIT("Write error during recovery (error = 0x%lx)",
305                             write_err);
306                 /*
307                  * Bits correspond to devices (excluding default mirror).
308                  * The default mirror cannot change during recovery.
309                  */
310                 for (m = 0; m < ms->nr_mirrors; m++) {
311                         if (&ms->mirror[m] == get_default_mirror(ms))
312                                 continue;
313                         if (test_bit(bit, &write_err))
314                                 fail_mirror(ms->mirror + m,
315                                             DM_RAID1_SYNC_ERROR);
316                         bit++;
317                 }
318         }
319
320         dm_rh_recovery_end(reg, !(read_err || write_err));
321 }
322
323 static int recover(struct mirror_set *ms, struct dm_region *reg)
324 {
325         int r;
326         unsigned i;
327         struct dm_io_region from, to[DM_KCOPYD_MAX_REGIONS], *dest;
328         struct mirror *m;
329         unsigned long flags = 0;
330         region_t key = dm_rh_get_region_key(reg);
331         sector_t region_size = dm_rh_get_region_size(ms->rh);
332
333         /* fill in the source */
334         m = get_default_mirror(ms);
335         from.bdev = m->dev->bdev;
336         from.sector = m->offset + dm_rh_region_to_sector(ms->rh, key);
337         if (key == (ms->nr_regions - 1)) {
338                 /*
339                  * The final region may be smaller than
340                  * region_size.
341                  */
342                 from.count = ms->ti->len & (region_size - 1);
343                 if (!from.count)
344                         from.count = region_size;
345         } else
346                 from.count = region_size;
347
348         /* fill in the destinations */
349         for (i = 0, dest = to; i < ms->nr_mirrors; i++) {
350                 if (&ms->mirror[i] == get_default_mirror(ms))
351                         continue;
352
353                 m = ms->mirror + i;
354                 dest->bdev = m->dev->bdev;
355                 dest->sector = m->offset + dm_rh_region_to_sector(ms->rh, key);
356                 dest->count = from.count;
357                 dest++;
358         }
359
360         /* hand to kcopyd */
361         if (!errors_handled(ms))
362                 set_bit(DM_KCOPYD_IGNORE_ERROR, &flags);
363
364         r = dm_kcopyd_copy(ms->kcopyd_client, &from, ms->nr_mirrors - 1, to,
365                            flags, recovery_complete, reg);
366
367         return r;
368 }
369
370 static void do_recovery(struct mirror_set *ms)
371 {
372         struct dm_region *reg;
373         struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
374         int r;
375
376         /*
377          * Start quiescing some regions.
378          */
379         dm_rh_recovery_prepare(ms->rh);
380
381         /*
382          * Copy any already quiesced regions.
383          */
384         while ((reg = dm_rh_recovery_start(ms->rh))) {
385                 r = recover(ms, reg);
386                 if (r)
387                         dm_rh_recovery_end(reg, 0);
388         }
389
390         /*
391          * Update the in sync flag.
392          */
393         if (!ms->in_sync &&
394             (log->type->get_sync_count(log) == ms->nr_regions)) {
395                 /* the sync is complete */
396                 dm_table_event(ms->ti->table);
397                 ms->in_sync = 1;
398         }
399 }
400
401 /*-----------------------------------------------------------------
402  * Reads
403  *---------------------------------------------------------------*/
404 static struct mirror *choose_mirror(struct mirror_set *ms, sector_t sector)
405 {
406         struct mirror *m = get_default_mirror(ms);
407
408         do {
409                 if (likely(!atomic_read(&m->error_count)))
410                         return m;
411
412                 if (m-- == ms->mirror)
413                         m += ms->nr_mirrors;
414         } while (m != get_default_mirror(ms));
415
416         return NULL;
417 }
418
419 static int default_ok(struct mirror *m)
420 {
421         struct mirror *default_mirror = get_default_mirror(m->ms);
422
423         return !atomic_read(&default_mirror->error_count);
424 }
425
426 static int mirror_available(struct mirror_set *ms, struct bio *bio)
427 {
428         struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
429         region_t region = dm_rh_bio_to_region(ms->rh, bio);
430
431         if (log->type->in_sync(log, region, 0))
432                 return choose_mirror(ms,  bio->bi_sector) ? 1 : 0;
433
434         return 0;
435 }
436
437 /*
438  * remap a buffer to a particular mirror.
439  */
440 static sector_t map_sector(struct mirror *m, struct bio *bio)
441 {
442         if (unlikely(!bio->bi_size))
443                 return 0;
444         return m->offset + dm_target_offset(m->ms->ti, bio->bi_sector);
445 }
446
447 static void map_bio(struct mirror *m, struct bio *bio)
448 {
449         bio->bi_bdev = m->dev->bdev;
450         bio->bi_sector = map_sector(m, bio);
451 }
452
453 static void map_region(struct dm_io_region *io, struct mirror *m,
454                        struct bio *bio)
455 {
456         io->bdev = m->dev->bdev;
457         io->sector = map_sector(m, bio);
458         io->count = bio->bi_size >> 9;
459 }
460
461 static void hold_bio(struct mirror_set *ms, struct bio *bio)
462 {
463         /*
464          * Lock is required to avoid race condition during suspend
465          * process.
466          */
467         spin_lock_irq(&ms->lock);
468
469         if (atomic_read(&ms->suspend)) {
470                 spin_unlock_irq(&ms->lock);
471
472                 /*
473                  * If device is suspended, complete the bio.
474                  */
475                 if (dm_noflush_suspending(ms->ti))
476                         bio_endio(bio, DM_ENDIO_REQUEUE);
477                 else
478                         bio_endio(bio, -EIO);
479                 return;
480         }
481
482         /*
483          * Hold bio until the suspend is complete.
484          */
485         bio_list_add(&ms->holds, bio);
486         spin_unlock_irq(&ms->lock);
487 }
488
489 /*-----------------------------------------------------------------
490  * Reads
491  *---------------------------------------------------------------*/
492 static void read_callback(unsigned long error, void *context)
493 {
494         struct bio *bio = context;
495         struct mirror *m;
496
497         m = bio_get_m(bio);
498         bio_set_m(bio, NULL);
499
500         if (likely(!error)) {
501                 bio_endio(bio, 0);
502                 return;
503         }
504
505         fail_mirror(m, DM_RAID1_READ_ERROR);
506
507         if (likely(default_ok(m)) || mirror_available(m->ms, bio)) {
508                 DMWARN_LIMIT("Read failure on mirror device %s.  "
509                              "Trying alternative device.",
510                              m->dev->name);
511                 queue_bio(m->ms, bio, bio_rw(bio));
512                 return;
513         }
514
515         DMERR_LIMIT("Read failure on mirror device %s.  Failing I/O.",
516                     m->dev->name);
517         bio_endio(bio, -EIO);
518 }
519
520 /* Asynchronous read. */
521 static void read_async_bio(struct mirror *m, struct bio *bio)
522 {
523         struct dm_io_region io;
524         struct dm_io_request io_req = {
525                 .bi_rw = READ,
526                 .mem.type = DM_IO_BVEC,
527                 .mem.ptr.bvec = bio->bi_io_vec + bio->bi_idx,
528                 .notify.fn = read_callback,
529                 .notify.context = bio,
530                 .client = m->ms->io_client,
531         };
532
533         map_region(&io, m, bio);
534         bio_set_m(bio, m);
535         BUG_ON(dm_io(&io_req, 1, &io, NULL));
536 }
537
538 static inline int region_in_sync(struct mirror_set *ms, region_t region,
539                                  int may_block)
540 {
541         int state = dm_rh_get_state(ms->rh, region, may_block);
542         return state == DM_RH_CLEAN || state == DM_RH_DIRTY;
543 }
544
545 static void do_reads(struct mirror_set *ms, struct bio_list *reads)
546 {
547         region_t region;
548         struct bio *bio;
549         struct mirror *m;
550
551         while ((bio = bio_list_pop(reads))) {
552                 region = dm_rh_bio_to_region(ms->rh, bio);
553                 m = get_default_mirror(ms);
554
555                 /*
556                  * We can only read balance if the region is in sync.
557                  */
558                 if (likely(region_in_sync(ms, region, 1)))
559                         m = choose_mirror(ms, bio->bi_sector);
560                 else if (m && atomic_read(&m->error_count))
561                         m = NULL;
562
563                 if (likely(m))
564                         read_async_bio(m, bio);
565                 else
566                         bio_endio(bio, -EIO);
567         }
568 }
569
570 /*-----------------------------------------------------------------
571  * Writes.
572  *
573  * We do different things with the write io depending on the
574  * state of the region that it's in:
575  *
576  * SYNC:        increment pending, use kcopyd to write to *all* mirrors
577  * RECOVERING:  delay the io until recovery completes
578  * NOSYNC:      increment pending, just write to the default mirror
579  *---------------------------------------------------------------*/
580
581
582 static void write_callback(unsigned long error, void *context)
583 {
584         unsigned i, ret = 0;
585         struct bio *bio = (struct bio *) context;
586         struct mirror_set *ms;
587         int should_wake = 0;
588         unsigned long flags;
589
590         ms = bio_get_m(bio)->ms;
591         bio_set_m(bio, NULL);
592
593         /*
594          * NOTE: We don't decrement the pending count here,
595          * instead it is done by the targets endio function.
596          * This way we handle both writes to SYNC and NOSYNC
597          * regions with the same code.
598          */
599         if (likely(!error)) {
600                 bio_endio(bio, ret);
601                 return;
602         }
603
604         for (i = 0; i < ms->nr_mirrors; i++)
605                 if (test_bit(i, &error))
606                         fail_mirror(ms->mirror + i, DM_RAID1_WRITE_ERROR);
607
608         /*
609          * Need to raise event.  Since raising
610          * events can block, we need to do it in
611          * the main thread.
612          */
613         spin_lock_irqsave(&ms->lock, flags);
614         if (!ms->failures.head)
615                 should_wake = 1;
616         bio_list_add(&ms->failures, bio);
617         spin_unlock_irqrestore(&ms->lock, flags);
618         if (should_wake)
619                 wakeup_mirrord(ms);
620 }
621
622 static void do_write(struct mirror_set *ms, struct bio *bio)
623 {
624         unsigned int i;
625         struct dm_io_region io[ms->nr_mirrors], *dest = io;
626         struct mirror *m;
627         struct dm_io_request io_req = {
628                 .bi_rw = WRITE | (bio->bi_rw & WRITE_FLUSH_FUA),
629                 .mem.type = DM_IO_BVEC,
630                 .mem.ptr.bvec = bio->bi_io_vec + bio->bi_idx,
631                 .notify.fn = write_callback,
632                 .notify.context = bio,
633                 .client = ms->io_client,
634         };
635
636         if (bio->bi_rw & REQ_DISCARD) {
637                 io_req.bi_rw |= REQ_DISCARD;
638                 io_req.mem.type = DM_IO_KMEM;
639                 io_req.mem.ptr.addr = NULL;
640         }
641
642         for (i = 0, m = ms->mirror; i < ms->nr_mirrors; i++, m++)
643                 map_region(dest++, m, bio);
644
645         /*
646          * Use default mirror because we only need it to retrieve the reference
647          * to the mirror set in write_callback().
648          */
649         bio_set_m(bio, get_default_mirror(ms));
650
651         BUG_ON(dm_io(&io_req, ms->nr_mirrors, io, NULL));
652 }
653
654 static void do_writes(struct mirror_set *ms, struct bio_list *writes)
655 {
656         int state;
657         struct bio *bio;
658         struct bio_list sync, nosync, recover, *this_list = NULL;
659         struct bio_list requeue;
660         struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
661         region_t region;
662
663         if (!writes->head)
664                 return;
665
666         /*
667          * Classify each write.
668          */
669         bio_list_init(&sync);
670         bio_list_init(&nosync);
671         bio_list_init(&recover);
672         bio_list_init(&requeue);
673
674         while ((bio = bio_list_pop(writes))) {
675                 if ((bio->bi_rw & REQ_FLUSH) ||
676                     (bio->bi_rw & REQ_DISCARD)) {
677                         bio_list_add(&sync, bio);
678                         continue;
679                 }
680
681                 region = dm_rh_bio_to_region(ms->rh, bio);
682
683                 if (log->type->is_remote_recovering &&
684                     log->type->is_remote_recovering(log, region)) {
685                         bio_list_add(&requeue, bio);
686                         continue;
687                 }
688
689                 state = dm_rh_get_state(ms->rh, region, 1);
690                 switch (state) {
691                 case DM_RH_CLEAN:
692                 case DM_RH_DIRTY:
693                         this_list = &sync;
694                         break;
695
696                 case DM_RH_NOSYNC:
697                         this_list = &nosync;
698                         break;
699
700                 case DM_RH_RECOVERING:
701                         this_list = &recover;
702                         break;
703                 }
704
705                 bio_list_add(this_list, bio);
706         }
707
708         /*
709          * Add bios that are delayed due to remote recovery
710          * back on to the write queue
711          */
712         if (unlikely(requeue.head)) {
713                 spin_lock_irq(&ms->lock);
714                 bio_list_merge(&ms->writes, &requeue);
715                 spin_unlock_irq(&ms->lock);
716                 delayed_wake(ms);
717         }
718
719         /*
720          * Increment the pending counts for any regions that will
721          * be written to (writes to recover regions are going to
722          * be delayed).
723          */
724         dm_rh_inc_pending(ms->rh, &sync);
725         dm_rh_inc_pending(ms->rh, &nosync);
726
727         /*
728          * If the flush fails on a previous call and succeeds here,
729          * we must not reset the log_failure variable.  We need
730          * userspace interaction to do that.
731          */
732         ms->log_failure = dm_rh_flush(ms->rh) ? 1 : ms->log_failure;
733
734         /*
735          * Dispatch io.
736          */
737         if (unlikely(ms->log_failure) && errors_handled(ms)) {
738                 spin_lock_irq(&ms->lock);
739                 bio_list_merge(&ms->failures, &sync);
740                 spin_unlock_irq(&ms->lock);
741                 wakeup_mirrord(ms);
742         } else
743                 while ((bio = bio_list_pop(&sync)))
744                         do_write(ms, bio);
745
746         while ((bio = bio_list_pop(&recover)))
747                 dm_rh_delay(ms->rh, bio);
748
749         while ((bio = bio_list_pop(&nosync))) {
750                 if (unlikely(ms->leg_failure) && errors_handled(ms)) {
751                         spin_lock_irq(&ms->lock);
752                         bio_list_add(&ms->failures, bio);
753                         spin_unlock_irq(&ms->lock);
754                         wakeup_mirrord(ms);
755                 } else {
756                         map_bio(get_default_mirror(ms), bio);
757                         generic_make_request(bio);
758                 }
759         }
760 }
761
762 static void do_failures(struct mirror_set *ms, struct bio_list *failures)
763 {
764         struct bio *bio;
765
766         if (likely(!failures->head))
767                 return;
768
769         /*
770          * If the log has failed, unattempted writes are being
771          * put on the holds list.  We can't issue those writes
772          * until a log has been marked, so we must store them.
773          *
774          * If a 'noflush' suspend is in progress, we can requeue
775          * the I/O's to the core.  This give userspace a chance
776          * to reconfigure the mirror, at which point the core
777          * will reissue the writes.  If the 'noflush' flag is
778          * not set, we have no choice but to return errors.
779          *
780          * Some writes on the failures list may have been
781          * submitted before the log failure and represent a
782          * failure to write to one of the devices.  It is ok
783          * for us to treat them the same and requeue them
784          * as well.
785          */
786         while ((bio = bio_list_pop(failures))) {
787                 if (!ms->log_failure) {
788                         ms->in_sync = 0;
789                         dm_rh_mark_nosync(ms->rh, bio);
790                 }
791
792                 /*
793                  * If all the legs are dead, fail the I/O.
794                  * If we have been told to handle errors, hold the bio
795                  * and wait for userspace to deal with the problem.
796                  * Otherwise pretend that the I/O succeeded. (This would
797                  * be wrong if the failed leg returned after reboot and
798                  * got replicated back to the good legs.)
799                  */
800                 if (!get_valid_mirror(ms))
801                         bio_endio(bio, -EIO);
802                 else if (errors_handled(ms))
803                         hold_bio(ms, bio);
804                 else
805                         bio_endio(bio, 0);
806         }
807 }
808
809 static void trigger_event(struct work_struct *work)
810 {
811         struct mirror_set *ms =
812                 container_of(work, struct mirror_set, trigger_event);
813
814         dm_table_event(ms->ti->table);
815 }
816
817 /*-----------------------------------------------------------------
818  * kmirrord
819  *---------------------------------------------------------------*/
820 static void do_mirror(struct work_struct *work)
821 {
822         struct mirror_set *ms = container_of(work, struct mirror_set,
823                                              kmirrord_work);
824         struct bio_list reads, writes, failures;
825         unsigned long flags;
826
827         spin_lock_irqsave(&ms->lock, flags);
828         reads = ms->reads;
829         writes = ms->writes;
830         failures = ms->failures;
831         bio_list_init(&ms->reads);
832         bio_list_init(&ms->writes);
833         bio_list_init(&ms->failures);
834         spin_unlock_irqrestore(&ms->lock, flags);
835
836         dm_rh_update_states(ms->rh, errors_handled(ms));
837         do_recovery(ms);
838         do_reads(ms, &reads);
839         do_writes(ms, &writes);
840         do_failures(ms, &failures);
841 }
842
843 /*-----------------------------------------------------------------
844  * Target functions
845  *---------------------------------------------------------------*/
846 static struct mirror_set *alloc_context(unsigned int nr_mirrors,
847                                         uint32_t region_size,
848                                         struct dm_target *ti,
849                                         struct dm_dirty_log *dl)
850 {
851         size_t len;
852         struct mirror_set *ms = NULL;
853
854         len = sizeof(*ms) + (sizeof(ms->mirror[0]) * nr_mirrors);
855
856         ms = kzalloc(len, GFP_KERNEL);
857         if (!ms) {
858                 ti->error = "Cannot allocate mirror context";
859                 return NULL;
860         }
861
862         spin_lock_init(&ms->lock);
863         bio_list_init(&ms->reads);
864         bio_list_init(&ms->writes);
865         bio_list_init(&ms->failures);
866         bio_list_init(&ms->holds);
867
868         ms->ti = ti;
869         ms->nr_mirrors = nr_mirrors;
870         ms->nr_regions = dm_sector_div_up(ti->len, region_size);
871         ms->in_sync = 0;
872         ms->log_failure = 0;
873         ms->leg_failure = 0;
874         atomic_set(&ms->suspend, 0);
875         atomic_set(&ms->default_mirror, DEFAULT_MIRROR);
876
877         ms->io_client = dm_io_client_create();
878         if (IS_ERR(ms->io_client)) {
879                 ti->error = "Error creating dm_io client";
880                 kfree(ms);
881                 return NULL;
882         }
883
884         ms->rh = dm_region_hash_create(ms, dispatch_bios, wakeup_mirrord,
885                                        wakeup_all_recovery_waiters,
886                                        ms->ti->begin, MAX_RECOVERY,
887                                        dl, region_size, ms->nr_regions);
888         if (IS_ERR(ms->rh)) {
889                 ti->error = "Error creating dirty region hash";
890                 dm_io_client_destroy(ms->io_client);
891                 kfree(ms);
892                 return NULL;
893         }
894
895         return ms;
896 }
897
898 static void free_context(struct mirror_set *ms, struct dm_target *ti,
899                          unsigned int m)
900 {
901         while (m--)
902                 dm_put_device(ti, ms->mirror[m].dev);
903
904         dm_io_client_destroy(ms->io_client);
905         dm_region_hash_destroy(ms->rh);
906         kfree(ms);
907 }
908
909 static int get_mirror(struct mirror_set *ms, struct dm_target *ti,
910                       unsigned int mirror, char **argv)
911 {
912         unsigned long long offset;
913         char dummy;
914
915         if (sscanf(argv[1], "%llu%c", &offset, &dummy) != 1) {
916                 ti->error = "Invalid offset";
917                 return -EINVAL;
918         }
919
920         if (dm_get_device(ti, argv[0], dm_table_get_mode(ti->table),
921                           &ms->mirror[mirror].dev)) {
922                 ti->error = "Device lookup failure";
923                 return -ENXIO;
924         }
925
926         ms->mirror[mirror].ms = ms;
927         atomic_set(&(ms->mirror[mirror].error_count), 0);
928         ms->mirror[mirror].error_type = 0;
929         ms->mirror[mirror].offset = offset;
930
931         return 0;
932 }
933
934 /*
935  * Create dirty log: log_type #log_params <log_params>
936  */
937 static struct dm_dirty_log *create_dirty_log(struct dm_target *ti,
938                                              unsigned argc, char **argv,
939                                              unsigned *args_used)
940 {
941         unsigned param_count;
942         struct dm_dirty_log *dl;
943         char dummy;
944
945         if (argc < 2) {
946                 ti->error = "Insufficient mirror log arguments";
947                 return NULL;
948         }
949
950         if (sscanf(argv[1], "%u%c", &param_count, &dummy) != 1) {
951                 ti->error = "Invalid mirror log argument count";
952                 return NULL;
953         }
954
955         *args_used = 2 + param_count;
956
957         if (argc < *args_used) {
958                 ti->error = "Insufficient mirror log arguments";
959                 return NULL;
960         }
961
962         dl = dm_dirty_log_create(argv[0], ti, mirror_flush, param_count,
963                                  argv + 2);
964         if (!dl) {
965                 ti->error = "Error creating mirror dirty log";
966                 return NULL;
967         }
968
969         return dl;
970 }
971
972 static int parse_features(struct mirror_set *ms, unsigned argc, char **argv,
973                           unsigned *args_used)
974 {
975         unsigned num_features;
976         struct dm_target *ti = ms->ti;
977         char dummy;
978
979         *args_used = 0;
980
981         if (!argc)
982                 return 0;
983
984         if (sscanf(argv[0], "%u%c", &num_features, &dummy) != 1) {
985                 ti->error = "Invalid number of features";
986                 return -EINVAL;
987         }
988
989         argc--;
990         argv++;
991         (*args_used)++;
992
993         if (num_features > argc) {
994                 ti->error = "Not enough arguments to support feature count";
995                 return -EINVAL;
996         }
997
998         if (!strcmp("handle_errors", argv[0]))
999                 ms->features |= DM_RAID1_HANDLE_ERRORS;
1000         else {
1001                 ti->error = "Unrecognised feature requested";
1002                 return -EINVAL;
1003         }
1004
1005         (*args_used)++;
1006
1007         return 0;
1008 }
1009
1010 /*
1011  * Construct a mirror mapping:
1012  *
1013  * log_type #log_params <log_params>
1014  * #mirrors [mirror_path offset]{2,}
1015  * [#features <features>]
1016  *
1017  * log_type is "core" or "disk"
1018  * #log_params is between 1 and 3
1019  *
1020  * If present, features must be "handle_errors".
1021  */
1022 static int mirror_ctr(struct dm_target *ti, unsigned int argc, char **argv)
1023 {
1024         int r;
1025         unsigned int nr_mirrors, m, args_used;
1026         struct mirror_set *ms;
1027         struct dm_dirty_log *dl;
1028         char dummy;
1029
1030         dl = create_dirty_log(ti, argc, argv, &args_used);
1031         if (!dl)
1032                 return -EINVAL;
1033
1034         argv += args_used;
1035         argc -= args_used;
1036
1037         if (!argc || sscanf(argv[0], "%u%c", &nr_mirrors, &dummy) != 1 ||
1038             nr_mirrors < 2 || nr_mirrors > DM_KCOPYD_MAX_REGIONS + 1) {
1039                 ti->error = "Invalid number of mirrors";
1040                 dm_dirty_log_destroy(dl);
1041                 return -EINVAL;
1042         }
1043
1044         argv++, argc--;
1045
1046         if (argc < nr_mirrors * 2) {
1047                 ti->error = "Too few mirror arguments";
1048                 dm_dirty_log_destroy(dl);
1049                 return -EINVAL;
1050         }
1051
1052         ms = alloc_context(nr_mirrors, dl->type->get_region_size(dl), ti, dl);
1053         if (!ms) {
1054                 dm_dirty_log_destroy(dl);
1055                 return -ENOMEM;
1056         }
1057
1058         /* Get the mirror parameter sets */
1059         for (m = 0; m < nr_mirrors; m++) {
1060                 r = get_mirror(ms, ti, m, argv);
1061                 if (r) {
1062                         free_context(ms, ti, m);
1063                         return r;
1064                 }
1065                 argv += 2;
1066                 argc -= 2;
1067         }
1068
1069         ti->private = ms;
1070
1071         r = dm_set_target_max_io_len(ti, dm_rh_get_region_size(ms->rh));
1072         if (r)
1073                 goto err_free_context;
1074
1075         ti->num_flush_requests = 1;
1076         ti->num_discard_requests = 1;
1077         ti->per_bio_data_size = sizeof(struct dm_raid1_bio_record);
1078         ti->discard_zeroes_data_unsupported = true;
1079
1080         ms->kmirrord_wq = alloc_workqueue("kmirrord",
1081                                           WQ_NON_REENTRANT | WQ_MEM_RECLAIM, 0);
1082         if (!ms->kmirrord_wq) {
1083                 DMERR("couldn't start kmirrord");
1084                 r = -ENOMEM;
1085                 goto err_free_context;
1086         }
1087         INIT_WORK(&ms->kmirrord_work, do_mirror);
1088         init_timer(&ms->timer);
1089         ms->timer_pending = 0;
1090         INIT_WORK(&ms->trigger_event, trigger_event);
1091
1092         r = parse_features(ms, argc, argv, &args_used);
1093         if (r)
1094                 goto err_destroy_wq;
1095
1096         argv += args_used;
1097         argc -= args_used;
1098
1099         /*
1100          * Any read-balancing addition depends on the
1101          * DM_RAID1_HANDLE_ERRORS flag being present.
1102          * This is because the decision to balance depends
1103          * on the sync state of a region.  If the above
1104          * flag is not present, we ignore errors; and
1105          * the sync state may be inaccurate.
1106          */
1107
1108         if (argc) {
1109                 ti->error = "Too many mirror arguments";
1110                 r = -EINVAL;
1111                 goto err_destroy_wq;
1112         }
1113
1114         ms->kcopyd_client = dm_kcopyd_client_create();
1115         if (IS_ERR(ms->kcopyd_client)) {
1116                 r = PTR_ERR(ms->kcopyd_client);
1117                 goto err_destroy_wq;
1118         }
1119
1120         wakeup_mirrord(ms);
1121         return 0;
1122
1123 err_destroy_wq:
1124         destroy_workqueue(ms->kmirrord_wq);
1125 err_free_context:
1126         free_context(ms, ti, ms->nr_mirrors);
1127         return r;
1128 }
1129
1130 static void mirror_dtr(struct dm_target *ti)
1131 {
1132         struct mirror_set *ms = (struct mirror_set *) ti->private;
1133
1134         del_timer_sync(&ms->timer);
1135         flush_workqueue(ms->kmirrord_wq);
1136         flush_work(&ms->trigger_event);
1137         dm_kcopyd_client_destroy(ms->kcopyd_client);
1138         destroy_workqueue(ms->kmirrord_wq);
1139         free_context(ms, ti, ms->nr_mirrors);
1140 }
1141
1142 /*
1143  * Mirror mapping function
1144  */
1145 static int mirror_map(struct dm_target *ti, struct bio *bio,
1146                       union map_info *map_context)
1147 {
1148         int r, rw = bio_rw(bio);
1149         struct mirror *m;
1150         struct mirror_set *ms = ti->private;
1151         struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
1152         struct dm_raid1_bio_record *bio_record =
1153           dm_per_bio_data(bio, sizeof(struct dm_raid1_bio_record));
1154
1155         bio_record->details.bi_bdev = NULL;
1156
1157         if (rw == WRITE) {
1158                 /* Save region for mirror_end_io() handler */
1159                 bio_record->write_region = dm_rh_bio_to_region(ms->rh, bio);
1160                 queue_bio(ms, bio, rw);
1161                 return DM_MAPIO_SUBMITTED;
1162         }
1163
1164         r = log->type->in_sync(log, dm_rh_bio_to_region(ms->rh, bio), 0);
1165         if (r < 0 && r != -EWOULDBLOCK)
1166                 return r;
1167
1168         /*
1169          * If region is not in-sync queue the bio.
1170          */
1171         if (!r || (r == -EWOULDBLOCK)) {
1172                 if (rw == READA)
1173                         return -EWOULDBLOCK;
1174
1175                 queue_bio(ms, bio, rw);
1176                 return DM_MAPIO_SUBMITTED;
1177         }
1178
1179         /*
1180          * The region is in-sync and we can perform reads directly.
1181          * Store enough information so we can retry if it fails.
1182          */
1183         m = choose_mirror(ms, bio->bi_sector);
1184         if (unlikely(!m))
1185                 return -EIO;
1186
1187         dm_bio_record(&bio_record->details, bio);
1188         bio_record->m = m;
1189
1190         map_bio(m, bio);
1191
1192         return DM_MAPIO_REMAPPED;
1193 }
1194
1195 static int mirror_end_io(struct dm_target *ti, struct bio *bio,
1196                          int error, union map_info *map_context)
1197 {
1198         int rw = bio_rw(bio);
1199         struct mirror_set *ms = (struct mirror_set *) ti->private;
1200         struct mirror *m = NULL;
1201         struct dm_bio_details *bd = NULL;
1202         struct dm_raid1_bio_record *bio_record =
1203           dm_per_bio_data(bio, sizeof(struct dm_raid1_bio_record));
1204
1205         /*
1206          * We need to dec pending if this was a write.
1207          */
1208         if (rw == WRITE) {
1209                 if (!(bio->bi_rw & (REQ_FLUSH | REQ_DISCARD)))
1210                         dm_rh_dec(ms->rh, bio_record->write_region);
1211                 return error;
1212         }
1213
1214         if (error == -EOPNOTSUPP)
1215                 goto out;
1216
1217         if ((error == -EWOULDBLOCK) && (bio->bi_rw & REQ_RAHEAD))
1218                 goto out;
1219
1220         if (unlikely(error)) {
1221                 if (!bio_record->details.bi_bdev) {
1222                         /*
1223                          * There wasn't enough memory to record necessary
1224                          * information for a retry or there was no other
1225                          * mirror in-sync.
1226                          */
1227                         DMERR_LIMIT("Mirror read failed.");
1228                         return -EIO;
1229                 }
1230
1231                 m = bio_record->m;
1232
1233                 DMERR("Mirror read failed from %s. Trying alternative device.",
1234                       m->dev->name);
1235
1236                 fail_mirror(m, DM_RAID1_READ_ERROR);
1237
1238                 /*
1239                  * A failed read is requeued for another attempt using an intact
1240                  * mirror.
1241                  */
1242                 if (default_ok(m) || mirror_available(ms, bio)) {
1243                         bd = &bio_record->details;
1244
1245                         dm_bio_restore(bd, bio);
1246                         bio_record->details.bi_bdev = NULL;
1247                         queue_bio(ms, bio, rw);
1248                         return DM_ENDIO_INCOMPLETE;
1249                 }
1250                 DMERR("All replicated volumes dead, failing I/O");
1251         }
1252
1253 out:
1254         bio_record->details.bi_bdev = NULL;
1255
1256         return error;
1257 }
1258
1259 static void mirror_presuspend(struct dm_target *ti)
1260 {
1261         struct mirror_set *ms = (struct mirror_set *) ti->private;
1262         struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
1263
1264         struct bio_list holds;
1265         struct bio *bio;
1266
1267         atomic_set(&ms->suspend, 1);
1268
1269         /*
1270          * Process bios in the hold list to start recovery waiting
1271          * for bios in the hold list. After the process, no bio has
1272          * a chance to be added in the hold list because ms->suspend
1273          * is set.
1274          */
1275         spin_lock_irq(&ms->lock);
1276         holds = ms->holds;
1277         bio_list_init(&ms->holds);
1278         spin_unlock_irq(&ms->lock);
1279
1280         while ((bio = bio_list_pop(&holds)))
1281                 hold_bio(ms, bio);
1282
1283         /*
1284          * We must finish up all the work that we've
1285          * generated (i.e. recovery work).
1286          */
1287         dm_rh_stop_recovery(ms->rh);
1288
1289         wait_event(_kmirrord_recovery_stopped,
1290                    !dm_rh_recovery_in_flight(ms->rh));
1291
1292         if (log->type->presuspend && log->type->presuspend(log))
1293                 /* FIXME: need better error handling */
1294                 DMWARN("log presuspend failed");
1295
1296         /*
1297          * Now that recovery is complete/stopped and the
1298          * delayed bios are queued, we need to wait for
1299          * the worker thread to complete.  This way,
1300          * we know that all of our I/O has been pushed.
1301          */
1302         flush_workqueue(ms->kmirrord_wq);
1303 }
1304
1305 static void mirror_postsuspend(struct dm_target *ti)
1306 {
1307         struct mirror_set *ms = ti->private;
1308         struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
1309
1310         if (log->type->postsuspend && log->type->postsuspend(log))
1311                 /* FIXME: need better error handling */
1312                 DMWARN("log postsuspend failed");
1313 }
1314
1315 static void mirror_resume(struct dm_target *ti)
1316 {
1317         struct mirror_set *ms = ti->private;
1318         struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
1319
1320         atomic_set(&ms->suspend, 0);
1321         if (log->type->resume && log->type->resume(log))
1322                 /* FIXME: need better error handling */
1323                 DMWARN("log resume failed");
1324         dm_rh_start_recovery(ms->rh);
1325 }
1326
1327 /*
1328  * device_status_char
1329  * @m: mirror device/leg we want the status of
1330  *
1331  * We return one character representing the most severe error
1332  * we have encountered.
1333  *    A => Alive - No failures
1334  *    D => Dead - A write failure occurred leaving mirror out-of-sync
1335  *    S => Sync - A sychronization failure occurred, mirror out-of-sync
1336  *    R => Read - A read failure occurred, mirror data unaffected
1337  *
1338  * Returns: <char>
1339  */
1340 static char device_status_char(struct mirror *m)
1341 {
1342         if (!atomic_read(&(m->error_count)))
1343                 return 'A';
1344
1345         return (test_bit(DM_RAID1_FLUSH_ERROR, &(m->error_type))) ? 'F' :
1346                 (test_bit(DM_RAID1_WRITE_ERROR, &(m->error_type))) ? 'D' :
1347                 (test_bit(DM_RAID1_SYNC_ERROR, &(m->error_type))) ? 'S' :
1348                 (test_bit(DM_RAID1_READ_ERROR, &(m->error_type))) ? 'R' : 'U';
1349 }
1350
1351
1352 static int mirror_status(struct dm_target *ti, status_type_t type,
1353                          unsigned status_flags, char *result, unsigned maxlen)
1354 {
1355         unsigned int m, sz = 0;
1356         struct mirror_set *ms = (struct mirror_set *) ti->private;
1357         struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
1358         char buffer[ms->nr_mirrors + 1];
1359
1360         switch (type) {
1361         case STATUSTYPE_INFO:
1362                 DMEMIT("%d ", ms->nr_mirrors);
1363                 for (m = 0; m < ms->nr_mirrors; m++) {
1364                         DMEMIT("%s ", ms->mirror[m].dev->name);
1365                         buffer[m] = device_status_char(&(ms->mirror[m]));
1366                 }
1367                 buffer[m] = '\0';
1368
1369                 DMEMIT("%llu/%llu 1 %s ",
1370                       (unsigned long long)log->type->get_sync_count(log),
1371                       (unsigned long long)ms->nr_regions, buffer);
1372
1373                 sz += log->type->status(log, type, result+sz, maxlen-sz);
1374
1375                 break;
1376
1377         case STATUSTYPE_TABLE:
1378                 sz = log->type->status(log, type, result, maxlen);
1379
1380                 DMEMIT("%d", ms->nr_mirrors);
1381                 for (m = 0; m < ms->nr_mirrors; m++)
1382                         DMEMIT(" %s %llu", ms->mirror[m].dev->name,
1383                                (unsigned long long)ms->mirror[m].offset);
1384
1385                 if (ms->features & DM_RAID1_HANDLE_ERRORS)
1386                         DMEMIT(" 1 handle_errors");
1387         }
1388
1389         return 0;
1390 }
1391
1392 static int mirror_iterate_devices(struct dm_target *ti,
1393                                   iterate_devices_callout_fn fn, void *data)
1394 {
1395         struct mirror_set *ms = ti->private;
1396         int ret = 0;
1397         unsigned i;
1398
1399         for (i = 0; !ret && i < ms->nr_mirrors; i++)
1400                 ret = fn(ti, ms->mirror[i].dev,
1401                          ms->mirror[i].offset, ti->len, data);
1402
1403         return ret;
1404 }
1405
1406 static struct target_type mirror_target = {
1407         .name    = "mirror",
1408         .version = {1, 13, 1},
1409         .module  = THIS_MODULE,
1410         .ctr     = mirror_ctr,
1411         .dtr     = mirror_dtr,
1412         .map     = mirror_map,
1413         .end_io  = mirror_end_io,
1414         .presuspend = mirror_presuspend,
1415         .postsuspend = mirror_postsuspend,
1416         .resume  = mirror_resume,
1417         .status  = mirror_status,
1418         .iterate_devices = mirror_iterate_devices,
1419 };
1420
1421 static int __init dm_mirror_init(void)
1422 {
1423         int r;
1424
1425         r = dm_register_target(&mirror_target);
1426         if (r < 0) {
1427                 DMERR("Failed to register mirror target");
1428                 goto bad_target;
1429         }
1430
1431         return 0;
1432
1433 bad_target:
1434         return r;
1435 }
1436
1437 static void __exit dm_mirror_exit(void)
1438 {
1439         dm_unregister_target(&mirror_target);
1440 }
1441
1442 /* Module hooks */
1443 module_init(dm_mirror_init);
1444 module_exit(dm_mirror_exit);
1445
1446 MODULE_DESCRIPTION(DM_NAME " mirror target");
1447 MODULE_AUTHOR("Joe Thornber");
1448 MODULE_LICENSE("GPL");