dlm: don't use idr_remove_all()
[~shefty/rdma-dev.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "lowcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27
28 static int                      ls_count;
29 static struct mutex             ls_lock;
30 static struct list_head         lslist;
31 static spinlock_t               lslist_lock;
32 static struct task_struct *     scand_task;
33
34
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37         ssize_t ret = len;
38         int n = simple_strtol(buf, NULL, 0);
39
40         ls = dlm_find_lockspace_local(ls->ls_local_handle);
41         if (!ls)
42                 return -EINVAL;
43
44         switch (n) {
45         case 0:
46                 dlm_ls_stop(ls);
47                 break;
48         case 1:
49                 dlm_ls_start(ls);
50                 break;
51         default:
52                 ret = -EINVAL;
53         }
54         dlm_put_lockspace(ls);
55         return ret;
56 }
57
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 {
60         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62         wake_up(&ls->ls_uevent_wait);
63         return len;
64 }
65
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 {
68         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
69 }
70
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 {
73         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74         return len;
75 }
76
77 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
78 {
79         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
80 }
81
82 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
83 {
84         int val = simple_strtoul(buf, NULL, 0);
85         if (val == 1)
86                 set_bit(LSFL_NODIR, &ls->ls_flags);
87         return len;
88 }
89
90 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
91 {
92         uint32_t status = dlm_recover_status(ls);
93         return snprintf(buf, PAGE_SIZE, "%x\n", status);
94 }
95
96 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
97 {
98         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
99 }
100
101 struct dlm_attr {
102         struct attribute attr;
103         ssize_t (*show)(struct dlm_ls *, char *);
104         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
105 };
106
107 static struct dlm_attr dlm_attr_control = {
108         .attr  = {.name = "control", .mode = S_IWUSR},
109         .store = dlm_control_store
110 };
111
112 static struct dlm_attr dlm_attr_event = {
113         .attr  = {.name = "event_done", .mode = S_IWUSR},
114         .store = dlm_event_store
115 };
116
117 static struct dlm_attr dlm_attr_id = {
118         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
119         .show  = dlm_id_show,
120         .store = dlm_id_store
121 };
122
123 static struct dlm_attr dlm_attr_nodir = {
124         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
125         .show  = dlm_nodir_show,
126         .store = dlm_nodir_store
127 };
128
129 static struct dlm_attr dlm_attr_recover_status = {
130         .attr  = {.name = "recover_status", .mode = S_IRUGO},
131         .show  = dlm_recover_status_show
132 };
133
134 static struct dlm_attr dlm_attr_recover_nodeid = {
135         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
136         .show  = dlm_recover_nodeid_show
137 };
138
139 static struct attribute *dlm_attrs[] = {
140         &dlm_attr_control.attr,
141         &dlm_attr_event.attr,
142         &dlm_attr_id.attr,
143         &dlm_attr_nodir.attr,
144         &dlm_attr_recover_status.attr,
145         &dlm_attr_recover_nodeid.attr,
146         NULL,
147 };
148
149 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
150                              char *buf)
151 {
152         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
153         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
154         return a->show ? a->show(ls, buf) : 0;
155 }
156
157 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
158                               const char *buf, size_t len)
159 {
160         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
161         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
162         return a->store ? a->store(ls, buf, len) : len;
163 }
164
165 static void lockspace_kobj_release(struct kobject *k)
166 {
167         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
168         kfree(ls);
169 }
170
171 static const struct sysfs_ops dlm_attr_ops = {
172         .show  = dlm_attr_show,
173         .store = dlm_attr_store,
174 };
175
176 static struct kobj_type dlm_ktype = {
177         .default_attrs = dlm_attrs,
178         .sysfs_ops     = &dlm_attr_ops,
179         .release       = lockspace_kobj_release,
180 };
181
182 static struct kset *dlm_kset;
183
184 static int do_uevent(struct dlm_ls *ls, int in)
185 {
186         int error;
187
188         if (in)
189                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
190         else
191                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
192
193         log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
194
195         /* dlm_controld will see the uevent, do the necessary group management
196            and then write to sysfs to wake us */
197
198         error = wait_event_interruptible(ls->ls_uevent_wait,
199                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
200
201         log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
202
203         if (error)
204                 goto out;
205
206         error = ls->ls_uevent_result;
207  out:
208         if (error)
209                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
210                           error, ls->ls_uevent_result);
211         return error;
212 }
213
214 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
215                       struct kobj_uevent_env *env)
216 {
217         struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
218
219         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
220         return 0;
221 }
222
223 static struct kset_uevent_ops dlm_uevent_ops = {
224         .uevent = dlm_uevent,
225 };
226
227 int __init dlm_lockspace_init(void)
228 {
229         ls_count = 0;
230         mutex_init(&ls_lock);
231         INIT_LIST_HEAD(&lslist);
232         spin_lock_init(&lslist_lock);
233
234         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
235         if (!dlm_kset) {
236                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
237                 return -ENOMEM;
238         }
239         return 0;
240 }
241
242 void dlm_lockspace_exit(void)
243 {
244         kset_unregister(dlm_kset);
245 }
246
247 static struct dlm_ls *find_ls_to_scan(void)
248 {
249         struct dlm_ls *ls;
250
251         spin_lock(&lslist_lock);
252         list_for_each_entry(ls, &lslist, ls_list) {
253                 if (time_after_eq(jiffies, ls->ls_scan_time +
254                                             dlm_config.ci_scan_secs * HZ)) {
255                         spin_unlock(&lslist_lock);
256                         return ls;
257                 }
258         }
259         spin_unlock(&lslist_lock);
260         return NULL;
261 }
262
263 static int dlm_scand(void *data)
264 {
265         struct dlm_ls *ls;
266
267         while (!kthread_should_stop()) {
268                 ls = find_ls_to_scan();
269                 if (ls) {
270                         if (dlm_lock_recovery_try(ls)) {
271                                 ls->ls_scan_time = jiffies;
272                                 dlm_scan_rsbs(ls);
273                                 dlm_scan_timeout(ls);
274                                 dlm_scan_waiters(ls);
275                                 dlm_unlock_recovery(ls);
276                         } else {
277                                 ls->ls_scan_time += HZ;
278                         }
279                         continue;
280                 }
281                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
282         }
283         return 0;
284 }
285
286 static int dlm_scand_start(void)
287 {
288         struct task_struct *p;
289         int error = 0;
290
291         p = kthread_run(dlm_scand, NULL, "dlm_scand");
292         if (IS_ERR(p))
293                 error = PTR_ERR(p);
294         else
295                 scand_task = p;
296         return error;
297 }
298
299 static void dlm_scand_stop(void)
300 {
301         kthread_stop(scand_task);
302 }
303
304 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
305 {
306         struct dlm_ls *ls;
307
308         spin_lock(&lslist_lock);
309
310         list_for_each_entry(ls, &lslist, ls_list) {
311                 if (ls->ls_global_id == id) {
312                         ls->ls_count++;
313                         goto out;
314                 }
315         }
316         ls = NULL;
317  out:
318         spin_unlock(&lslist_lock);
319         return ls;
320 }
321
322 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
323 {
324         struct dlm_ls *ls;
325
326         spin_lock(&lslist_lock);
327         list_for_each_entry(ls, &lslist, ls_list) {
328                 if (ls->ls_local_handle == lockspace) {
329                         ls->ls_count++;
330                         goto out;
331                 }
332         }
333         ls = NULL;
334  out:
335         spin_unlock(&lslist_lock);
336         return ls;
337 }
338
339 struct dlm_ls *dlm_find_lockspace_device(int minor)
340 {
341         struct dlm_ls *ls;
342
343         spin_lock(&lslist_lock);
344         list_for_each_entry(ls, &lslist, ls_list) {
345                 if (ls->ls_device.minor == minor) {
346                         ls->ls_count++;
347                         goto out;
348                 }
349         }
350         ls = NULL;
351  out:
352         spin_unlock(&lslist_lock);
353         return ls;
354 }
355
356 void dlm_put_lockspace(struct dlm_ls *ls)
357 {
358         spin_lock(&lslist_lock);
359         ls->ls_count--;
360         spin_unlock(&lslist_lock);
361 }
362
363 static void remove_lockspace(struct dlm_ls *ls)
364 {
365         for (;;) {
366                 spin_lock(&lslist_lock);
367                 if (ls->ls_count == 0) {
368                         WARN_ON(ls->ls_create_count != 0);
369                         list_del(&ls->ls_list);
370                         spin_unlock(&lslist_lock);
371                         return;
372                 }
373                 spin_unlock(&lslist_lock);
374                 ssleep(1);
375         }
376 }
377
378 static int threads_start(void)
379 {
380         int error;
381
382         error = dlm_scand_start();
383         if (error) {
384                 log_print("cannot start dlm_scand thread %d", error);
385                 goto fail;
386         }
387
388         /* Thread for sending/receiving messages for all lockspace's */
389         error = dlm_lowcomms_start();
390         if (error) {
391                 log_print("cannot start dlm lowcomms %d", error);
392                 goto scand_fail;
393         }
394
395         return 0;
396
397  scand_fail:
398         dlm_scand_stop();
399  fail:
400         return error;
401 }
402
403 static void threads_stop(void)
404 {
405         dlm_scand_stop();
406         dlm_lowcomms_stop();
407 }
408
409 static int new_lockspace(const char *name, const char *cluster,
410                          uint32_t flags, int lvblen,
411                          const struct dlm_lockspace_ops *ops, void *ops_arg,
412                          int *ops_result, dlm_lockspace_t **lockspace)
413 {
414         struct dlm_ls *ls;
415         int i, size, error;
416         int do_unreg = 0;
417         int namelen = strlen(name);
418
419         if (namelen > DLM_LOCKSPACE_LEN)
420                 return -EINVAL;
421
422         if (!lvblen || (lvblen % 8))
423                 return -EINVAL;
424
425         if (!try_module_get(THIS_MODULE))
426                 return -EINVAL;
427
428         if (!dlm_user_daemon_available()) {
429                 log_print("dlm user daemon not available");
430                 error = -EUNATCH;
431                 goto out;
432         }
433
434         if (ops && ops_result) {
435                 if (!dlm_config.ci_recover_callbacks)
436                         *ops_result = -EOPNOTSUPP;
437                 else
438                         *ops_result = 0;
439         }
440
441         if (dlm_config.ci_recover_callbacks && cluster &&
442             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
443                 log_print("dlm cluster name %s mismatch %s",
444                           dlm_config.ci_cluster_name, cluster);
445                 error = -EBADR;
446                 goto out;
447         }
448
449         error = 0;
450
451         spin_lock(&lslist_lock);
452         list_for_each_entry(ls, &lslist, ls_list) {
453                 WARN_ON(ls->ls_create_count <= 0);
454                 if (ls->ls_namelen != namelen)
455                         continue;
456                 if (memcmp(ls->ls_name, name, namelen))
457                         continue;
458                 if (flags & DLM_LSFL_NEWEXCL) {
459                         error = -EEXIST;
460                         break;
461                 }
462                 ls->ls_create_count++;
463                 *lockspace = ls;
464                 error = 1;
465                 break;
466         }
467         spin_unlock(&lslist_lock);
468
469         if (error)
470                 goto out;
471
472         error = -ENOMEM;
473
474         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
475         if (!ls)
476                 goto out;
477         memcpy(ls->ls_name, name, namelen);
478         ls->ls_namelen = namelen;
479         ls->ls_lvblen = lvblen;
480         ls->ls_count = 0;
481         ls->ls_flags = 0;
482         ls->ls_scan_time = jiffies;
483
484         if (ops && dlm_config.ci_recover_callbacks) {
485                 ls->ls_ops = ops;
486                 ls->ls_ops_arg = ops_arg;
487         }
488
489         if (flags & DLM_LSFL_TIMEWARN)
490                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
491
492         /* ls_exflags are forced to match among nodes, and we don't
493            need to require all nodes to have some flags set */
494         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
495                                     DLM_LSFL_NEWEXCL));
496
497         size = dlm_config.ci_rsbtbl_size;
498         ls->ls_rsbtbl_size = size;
499
500         ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
501         if (!ls->ls_rsbtbl)
502                 goto out_lsfree;
503         for (i = 0; i < size; i++) {
504                 ls->ls_rsbtbl[i].keep.rb_node = NULL;
505                 ls->ls_rsbtbl[i].toss.rb_node = NULL;
506                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
507         }
508
509         spin_lock_init(&ls->ls_remove_spin);
510
511         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
512                 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
513                                                  GFP_KERNEL);
514                 if (!ls->ls_remove_names[i])
515                         goto out_rsbtbl;
516         }
517
518         idr_init(&ls->ls_lkbidr);
519         spin_lock_init(&ls->ls_lkbidr_spin);
520
521         INIT_LIST_HEAD(&ls->ls_waiters);
522         mutex_init(&ls->ls_waiters_mutex);
523         INIT_LIST_HEAD(&ls->ls_orphans);
524         mutex_init(&ls->ls_orphans_mutex);
525         INIT_LIST_HEAD(&ls->ls_timeout);
526         mutex_init(&ls->ls_timeout_mutex);
527
528         INIT_LIST_HEAD(&ls->ls_new_rsb);
529         spin_lock_init(&ls->ls_new_rsb_spin);
530
531         INIT_LIST_HEAD(&ls->ls_nodes);
532         INIT_LIST_HEAD(&ls->ls_nodes_gone);
533         ls->ls_num_nodes = 0;
534         ls->ls_low_nodeid = 0;
535         ls->ls_total_weight = 0;
536         ls->ls_node_array = NULL;
537
538         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
539         ls->ls_stub_rsb.res_ls = ls;
540
541         ls->ls_debug_rsb_dentry = NULL;
542         ls->ls_debug_waiters_dentry = NULL;
543
544         init_waitqueue_head(&ls->ls_uevent_wait);
545         ls->ls_uevent_result = 0;
546         init_completion(&ls->ls_members_done);
547         ls->ls_members_result = -1;
548
549         mutex_init(&ls->ls_cb_mutex);
550         INIT_LIST_HEAD(&ls->ls_cb_delay);
551
552         ls->ls_recoverd_task = NULL;
553         mutex_init(&ls->ls_recoverd_active);
554         spin_lock_init(&ls->ls_recover_lock);
555         spin_lock_init(&ls->ls_rcom_spin);
556         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
557         ls->ls_recover_status = 0;
558         ls->ls_recover_seq = 0;
559         ls->ls_recover_args = NULL;
560         init_rwsem(&ls->ls_in_recovery);
561         init_rwsem(&ls->ls_recv_active);
562         INIT_LIST_HEAD(&ls->ls_requestqueue);
563         mutex_init(&ls->ls_requestqueue_mutex);
564         mutex_init(&ls->ls_clear_proc_locks);
565
566         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
567         if (!ls->ls_recover_buf)
568                 goto out_lkbidr;
569
570         ls->ls_slot = 0;
571         ls->ls_num_slots = 0;
572         ls->ls_slots_size = 0;
573         ls->ls_slots = NULL;
574
575         INIT_LIST_HEAD(&ls->ls_recover_list);
576         spin_lock_init(&ls->ls_recover_list_lock);
577         idr_init(&ls->ls_recover_idr);
578         spin_lock_init(&ls->ls_recover_idr_lock);
579         ls->ls_recover_list_count = 0;
580         ls->ls_local_handle = ls;
581         init_waitqueue_head(&ls->ls_wait_general);
582         INIT_LIST_HEAD(&ls->ls_root_list);
583         init_rwsem(&ls->ls_root_sem);
584
585         spin_lock(&lslist_lock);
586         ls->ls_create_count = 1;
587         list_add(&ls->ls_list, &lslist);
588         spin_unlock(&lslist_lock);
589
590         if (flags & DLM_LSFL_FS) {
591                 error = dlm_callback_start(ls);
592                 if (error) {
593                         log_error(ls, "can't start dlm_callback %d", error);
594                         goto out_delist;
595                 }
596         }
597
598         init_waitqueue_head(&ls->ls_recover_lock_wait);
599
600         /*
601          * Once started, dlm_recoverd first looks for ls in lslist, then
602          * initializes ls_in_recovery as locked in "down" mode.  We need
603          * to wait for the wakeup from dlm_recoverd because in_recovery
604          * has to start out in down mode.
605          */
606
607         error = dlm_recoverd_start(ls);
608         if (error) {
609                 log_error(ls, "can't start dlm_recoverd %d", error);
610                 goto out_callback;
611         }
612
613         wait_event(ls->ls_recover_lock_wait,
614                    test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
615
616         ls->ls_kobj.kset = dlm_kset;
617         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
618                                      "%s", ls->ls_name);
619         if (error)
620                 goto out_recoverd;
621         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
622
623         /* let kobject handle freeing of ls if there's an error */
624         do_unreg = 1;
625
626         /* This uevent triggers dlm_controld in userspace to add us to the
627            group of nodes that are members of this lockspace (managed by the
628            cluster infrastructure.)  Once it's done that, it tells us who the
629            current lockspace members are (via configfs) and then tells the
630            lockspace to start running (via sysfs) in dlm_ls_start(). */
631
632         error = do_uevent(ls, 1);
633         if (error)
634                 goto out_recoverd;
635
636         wait_for_completion(&ls->ls_members_done);
637         error = ls->ls_members_result;
638         if (error)
639                 goto out_members;
640
641         dlm_create_debug_file(ls);
642
643         log_debug(ls, "join complete");
644         *lockspace = ls;
645         return 0;
646
647  out_members:
648         do_uevent(ls, 0);
649         dlm_clear_members(ls);
650         kfree(ls->ls_node_array);
651  out_recoverd:
652         dlm_recoverd_stop(ls);
653  out_callback:
654         dlm_callback_stop(ls);
655  out_delist:
656         spin_lock(&lslist_lock);
657         list_del(&ls->ls_list);
658         spin_unlock(&lslist_lock);
659         idr_destroy(&ls->ls_recover_idr);
660         kfree(ls->ls_recover_buf);
661  out_lkbidr:
662         idr_destroy(&ls->ls_lkbidr);
663         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
664                 if (ls->ls_remove_names[i])
665                         kfree(ls->ls_remove_names[i]);
666         }
667  out_rsbtbl:
668         vfree(ls->ls_rsbtbl);
669  out_lsfree:
670         if (do_unreg)
671                 kobject_put(&ls->ls_kobj);
672         else
673                 kfree(ls);
674  out:
675         module_put(THIS_MODULE);
676         return error;
677 }
678
679 int dlm_new_lockspace(const char *name, const char *cluster,
680                       uint32_t flags, int lvblen,
681                       const struct dlm_lockspace_ops *ops, void *ops_arg,
682                       int *ops_result, dlm_lockspace_t **lockspace)
683 {
684         int error = 0;
685
686         mutex_lock(&ls_lock);
687         if (!ls_count)
688                 error = threads_start();
689         if (error)
690                 goto out;
691
692         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
693                               ops_result, lockspace);
694         if (!error)
695                 ls_count++;
696         if (error > 0)
697                 error = 0;
698         if (!ls_count)
699                 threads_stop();
700  out:
701         mutex_unlock(&ls_lock);
702         return error;
703 }
704
705 static int lkb_idr_is_local(int id, void *p, void *data)
706 {
707         struct dlm_lkb *lkb = p;
708
709         if (!lkb->lkb_nodeid)
710                 return 1;
711         return 0;
712 }
713
714 static int lkb_idr_is_any(int id, void *p, void *data)
715 {
716         return 1;
717 }
718
719 static int lkb_idr_free(int id, void *p, void *data)
720 {
721         struct dlm_lkb *lkb = p;
722
723         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
724                 dlm_free_lvb(lkb->lkb_lvbptr);
725
726         dlm_free_lkb(lkb);
727         return 0;
728 }
729
730 /* NOTE: We check the lkbidr here rather than the resource table.
731    This is because there may be LKBs queued as ASTs that have been unlinked
732    from their RSBs and are pending deletion once the AST has been delivered */
733
734 static int lockspace_busy(struct dlm_ls *ls, int force)
735 {
736         int rv;
737
738         spin_lock(&ls->ls_lkbidr_spin);
739         if (force == 0) {
740                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
741         } else if (force == 1) {
742                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
743         } else {
744                 rv = 0;
745         }
746         spin_unlock(&ls->ls_lkbidr_spin);
747         return rv;
748 }
749
750 static int release_lockspace(struct dlm_ls *ls, int force)
751 {
752         struct dlm_rsb *rsb;
753         struct rb_node *n;
754         int i, busy, rv;
755
756         busy = lockspace_busy(ls, force);
757
758         spin_lock(&lslist_lock);
759         if (ls->ls_create_count == 1) {
760                 if (busy) {
761                         rv = -EBUSY;
762                 } else {
763                         /* remove_lockspace takes ls off lslist */
764                         ls->ls_create_count = 0;
765                         rv = 0;
766                 }
767         } else if (ls->ls_create_count > 1) {
768                 rv = --ls->ls_create_count;
769         } else {
770                 rv = -EINVAL;
771         }
772         spin_unlock(&lslist_lock);
773
774         if (rv) {
775                 log_debug(ls, "release_lockspace no remove %d", rv);
776                 return rv;
777         }
778
779         dlm_device_deregister(ls);
780
781         if (force < 3 && dlm_user_daemon_available())
782                 do_uevent(ls, 0);
783
784         dlm_recoverd_stop(ls);
785
786         dlm_callback_stop(ls);
787
788         remove_lockspace(ls);
789
790         dlm_delete_debug_file(ls);
791
792         kfree(ls->ls_recover_buf);
793
794         /*
795          * Free all lkb's in idr
796          */
797
798         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
799         idr_destroy(&ls->ls_lkbidr);
800
801         /*
802          * Free all rsb's on rsbtbl[] lists
803          */
804
805         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
806                 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
807                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
808                         rb_erase(n, &ls->ls_rsbtbl[i].keep);
809                         dlm_free_rsb(rsb);
810                 }
811
812                 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
813                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
814                         rb_erase(n, &ls->ls_rsbtbl[i].toss);
815                         dlm_free_rsb(rsb);
816                 }
817         }
818
819         vfree(ls->ls_rsbtbl);
820
821         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
822                 kfree(ls->ls_remove_names[i]);
823
824         while (!list_empty(&ls->ls_new_rsb)) {
825                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
826                                        res_hashchain);
827                 list_del(&rsb->res_hashchain);
828                 dlm_free_rsb(rsb);
829         }
830
831         /*
832          * Free structures on any other lists
833          */
834
835         dlm_purge_requestqueue(ls);
836         kfree(ls->ls_recover_args);
837         dlm_clear_members(ls);
838         dlm_clear_members_gone(ls);
839         kfree(ls->ls_node_array);
840         log_debug(ls, "release_lockspace final free");
841         kobject_put(&ls->ls_kobj);
842         /* The ls structure will be freed when the kobject is done with */
843
844         module_put(THIS_MODULE);
845         return 0;
846 }
847
848 /*
849  * Called when a system has released all its locks and is not going to use the
850  * lockspace any longer.  We free everything we're managing for this lockspace.
851  * Remaining nodes will go through the recovery process as if we'd died.  The
852  * lockspace must continue to function as usual, participating in recoveries,
853  * until this returns.
854  *
855  * Force has 4 possible values:
856  * 0 - don't destroy locksapce if it has any LKBs
857  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
858  * 2 - destroy lockspace regardless of LKBs
859  * 3 - destroy lockspace as part of a forced shutdown
860  */
861
862 int dlm_release_lockspace(void *lockspace, int force)
863 {
864         struct dlm_ls *ls;
865         int error;
866
867         ls = dlm_find_lockspace_local(lockspace);
868         if (!ls)
869                 return -EINVAL;
870         dlm_put_lockspace(ls);
871
872         mutex_lock(&ls_lock);
873         error = release_lockspace(ls, force);
874         if (!error)
875                 ls_count--;
876         if (!ls_count)
877                 threads_stop();
878         mutex_unlock(&ls_lock);
879
880         return error;
881 }
882
883 void dlm_stop_lockspaces(void)
884 {
885         struct dlm_ls *ls;
886
887  restart:
888         spin_lock(&lslist_lock);
889         list_for_each_entry(ls, &lslist, ls_list) {
890                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
891                         continue;
892                 spin_unlock(&lslist_lock);
893                 log_error(ls, "no userland control daemon, stopping lockspace");
894                 dlm_ls_stop(ls);
895                 goto restart;
896         }
897         spin_unlock(&lslist_lock);
898 }
899