b56950758188d4b72f4ed2e193047ab1aded8c8b
[~shefty/rdma-dev.git] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "lowcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89                                     struct dlm_message *ms);
90 static int receive_extralen(struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void del_timeout(struct dlm_lkb *lkb);
93 static void toss_rsb(struct kref *kref);
94
95 /*
96  * Lock compatibilty matrix - thanks Steve
97  * UN = Unlocked state. Not really a state, used as a flag
98  * PD = Padding. Used to make the matrix a nice power of two in size
99  * Other states are the same as the VMS DLM.
100  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
101  */
102
103 static const int __dlm_compat_matrix[8][8] = {
104       /* UN NL CR CW PR PW EX PD */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
106         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
107         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
108         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
109         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
110         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
111         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
112         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
113 };
114
115 /*
116  * This defines the direction of transfer of LVB data.
117  * Granted mode is the row; requested mode is the column.
118  * Usage: matrix[grmode+1][rqmode+1]
119  * 1 = LVB is returned to the caller
120  * 0 = LVB is written to the resource
121  * -1 = nothing happens to the LVB
122  */
123
124 const int dlm_lvb_operations[8][8] = {
125         /* UN   NL  CR  CW  PR  PW  EX  PD*/
126         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
127         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
128         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
129         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
130         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
131         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
133         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
134 };
135
136 #define modes_compat(gr, rq) \
137         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
138
139 int dlm_modes_compat(int mode1, int mode2)
140 {
141         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
142 }
143
144 /*
145  * Compatibility matrix for conversions with QUECVT set.
146  * Granted mode is the row; requested mode is the column.
147  * Usage: matrix[grmode+1][rqmode+1]
148  */
149
150 static const int __quecvt_compat_matrix[8][8] = {
151       /* UN NL CR CW PR PW EX PD */
152         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
153         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
154         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
155         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
156         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
157         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
158         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
159         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
160 };
161
162 void dlm_print_lkb(struct dlm_lkb *lkb)
163 {
164         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
165                "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
166                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
167                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
168                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
169                (unsigned long long)lkb->lkb_recover_seq);
170 }
171
172 static void dlm_print_rsb(struct dlm_rsb *r)
173 {
174         printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
175                "rlc %d name %s\n",
176                r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
177                r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
178                r->res_name);
179 }
180
181 void dlm_dump_rsb(struct dlm_rsb *r)
182 {
183         struct dlm_lkb *lkb;
184
185         dlm_print_rsb(r);
186
187         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
188                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
189         printk(KERN_ERR "rsb lookup list\n");
190         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
191                 dlm_print_lkb(lkb);
192         printk(KERN_ERR "rsb grant queue:\n");
193         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
194                 dlm_print_lkb(lkb);
195         printk(KERN_ERR "rsb convert queue:\n");
196         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
197                 dlm_print_lkb(lkb);
198         printk(KERN_ERR "rsb wait queue:\n");
199         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
200                 dlm_print_lkb(lkb);
201 }
202
203 /* Threads cannot use the lockspace while it's being recovered */
204
205 static inline void dlm_lock_recovery(struct dlm_ls *ls)
206 {
207         down_read(&ls->ls_in_recovery);
208 }
209
210 void dlm_unlock_recovery(struct dlm_ls *ls)
211 {
212         up_read(&ls->ls_in_recovery);
213 }
214
215 int dlm_lock_recovery_try(struct dlm_ls *ls)
216 {
217         return down_read_trylock(&ls->ls_in_recovery);
218 }
219
220 static inline int can_be_queued(struct dlm_lkb *lkb)
221 {
222         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
223 }
224
225 static inline int force_blocking_asts(struct dlm_lkb *lkb)
226 {
227         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
228 }
229
230 static inline int is_demoted(struct dlm_lkb *lkb)
231 {
232         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
233 }
234
235 static inline int is_altmode(struct dlm_lkb *lkb)
236 {
237         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
238 }
239
240 static inline int is_granted(struct dlm_lkb *lkb)
241 {
242         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
243 }
244
245 static inline int is_remote(struct dlm_rsb *r)
246 {
247         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
248         return !!r->res_nodeid;
249 }
250
251 static inline int is_process_copy(struct dlm_lkb *lkb)
252 {
253         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
254 }
255
256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
259 }
260
261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265                 return 1;
266         return 0;
267 }
268
269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
277 }
278
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
282 }
283
284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
287                                   DLM_IFL_OVERLAP_CANCEL));
288 }
289
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292         if (is_master_copy(lkb))
293                 return;
294
295         del_timeout(lkb);
296
297         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
298
299         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
300            timeout caused the cancel then return -ETIMEDOUT */
301         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
302                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
303                 rv = -ETIMEDOUT;
304         }
305
306         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
307                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
308                 rv = -EDEADLK;
309         }
310
311         dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
312 }
313
314 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
315 {
316         queue_cast(r, lkb,
317                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
318 }
319
320 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
321 {
322         if (is_master_copy(lkb)) {
323                 send_bast(r, lkb, rqmode);
324         } else {
325                 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
326         }
327 }
328
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332
333 /* This is only called to add a reference when the code already holds
334    a valid reference to the rsb, so there's no need for locking. */
335
336 static inline void hold_rsb(struct dlm_rsb *r)
337 {
338         kref_get(&r->res_ref);
339 }
340
341 void dlm_hold_rsb(struct dlm_rsb *r)
342 {
343         hold_rsb(r);
344 }
345
346 /* When all references to the rsb are gone it's transferred to
347    the tossed list for later disposal. */
348
349 static void put_rsb(struct dlm_rsb *r)
350 {
351         struct dlm_ls *ls = r->res_ls;
352         uint32_t bucket = r->res_bucket;
353
354         spin_lock(&ls->ls_rsbtbl[bucket].lock);
355         kref_put(&r->res_ref, toss_rsb);
356         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
357 }
358
359 void dlm_put_rsb(struct dlm_rsb *r)
360 {
361         put_rsb(r);
362 }
363
364 static int pre_rsb_struct(struct dlm_ls *ls)
365 {
366         struct dlm_rsb *r1, *r2;
367         int count = 0;
368
369         spin_lock(&ls->ls_new_rsb_spin);
370         if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
371                 spin_unlock(&ls->ls_new_rsb_spin);
372                 return 0;
373         }
374         spin_unlock(&ls->ls_new_rsb_spin);
375
376         r1 = dlm_allocate_rsb(ls);
377         r2 = dlm_allocate_rsb(ls);
378
379         spin_lock(&ls->ls_new_rsb_spin);
380         if (r1) {
381                 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
382                 ls->ls_new_rsb_count++;
383         }
384         if (r2) {
385                 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
386                 ls->ls_new_rsb_count++;
387         }
388         count = ls->ls_new_rsb_count;
389         spin_unlock(&ls->ls_new_rsb_spin);
390
391         if (!count)
392                 return -ENOMEM;
393         return 0;
394 }
395
396 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
397    unlock any spinlocks, go back and call pre_rsb_struct again.
398    Otherwise, take an rsb off the list and return it. */
399
400 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
401                           struct dlm_rsb **r_ret)
402 {
403         struct dlm_rsb *r;
404         int count;
405
406         spin_lock(&ls->ls_new_rsb_spin);
407         if (list_empty(&ls->ls_new_rsb)) {
408                 count = ls->ls_new_rsb_count;
409                 spin_unlock(&ls->ls_new_rsb_spin);
410                 log_debug(ls, "find_rsb retry %d %d %s",
411                           count, dlm_config.ci_new_rsb_count, name);
412                 return -EAGAIN;
413         }
414
415         r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
416         list_del(&r->res_hashchain);
417         /* Convert the empty list_head to a NULL rb_node for tree usage: */
418         memset(&r->res_hashnode, 0, sizeof(struct rb_node));
419         ls->ls_new_rsb_count--;
420         spin_unlock(&ls->ls_new_rsb_spin);
421
422         r->res_ls = ls;
423         r->res_length = len;
424         memcpy(r->res_name, name, len);
425         mutex_init(&r->res_mutex);
426
427         INIT_LIST_HEAD(&r->res_lookup);
428         INIT_LIST_HEAD(&r->res_grantqueue);
429         INIT_LIST_HEAD(&r->res_convertqueue);
430         INIT_LIST_HEAD(&r->res_waitqueue);
431         INIT_LIST_HEAD(&r->res_root_list);
432         INIT_LIST_HEAD(&r->res_recover_list);
433
434         *r_ret = r;
435         return 0;
436 }
437
438 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
439 {
440         char maxname[DLM_RESNAME_MAXLEN];
441
442         memset(maxname, 0, DLM_RESNAME_MAXLEN);
443         memcpy(maxname, name, nlen);
444         return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
445 }
446
447 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
448                         struct dlm_rsb **r_ret)
449 {
450         struct rb_node *node = tree->rb_node;
451         struct dlm_rsb *r;
452         int rc;
453
454         while (node) {
455                 r = rb_entry(node, struct dlm_rsb, res_hashnode);
456                 rc = rsb_cmp(r, name, len);
457                 if (rc < 0)
458                         node = node->rb_left;
459                 else if (rc > 0)
460                         node = node->rb_right;
461                 else
462                         goto found;
463         }
464         *r_ret = NULL;
465         return -EBADR;
466
467  found:
468         *r_ret = r;
469         return 0;
470 }
471
472 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
473 {
474         struct rb_node **newn = &tree->rb_node;
475         struct rb_node *parent = NULL;
476         int rc;
477
478         while (*newn) {
479                 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
480                                                res_hashnode);
481
482                 parent = *newn;
483                 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
484                 if (rc < 0)
485                         newn = &parent->rb_left;
486                 else if (rc > 0)
487                         newn = &parent->rb_right;
488                 else {
489                         log_print("rsb_insert match");
490                         dlm_dump_rsb(rsb);
491                         dlm_dump_rsb(cur);
492                         return -EEXIST;
493                 }
494         }
495
496         rb_link_node(&rsb->res_hashnode, parent, newn);
497         rb_insert_color(&rsb->res_hashnode, tree);
498         return 0;
499 }
500
501 /*
502  * Find rsb in rsbtbl and potentially create/add one
503  *
504  * Delaying the release of rsb's has a similar benefit to applications keeping
505  * NL locks on an rsb, but without the guarantee that the cached master value
506  * will still be valid when the rsb is reused.  Apps aren't always smart enough
507  * to keep NL locks on an rsb that they may lock again shortly; this can lead
508  * to excessive master lookups and removals if we don't delay the release.
509  *
510  * Searching for an rsb means looking through both the normal list and toss
511  * list.  When found on the toss list the rsb is moved to the normal list with
512  * ref count of 1; when found on normal list the ref count is incremented.
513  *
514  * rsb's on the keep list are being used locally and refcounted.
515  * rsb's on the toss list are not being used locally, and are not refcounted.
516  *
517  * The toss list rsb's were either
518  * - previously used locally but not any more (were on keep list, then
519  *   moved to toss list when last refcount dropped)
520  * - created and put on toss list as a directory record for a lookup
521  *   (we are the dir node for the res, but are not using the res right now,
522  *   but some other node is)
523  *
524  * The purpose of find_rsb() is to return a refcounted rsb for local use.
525  * So, if the given rsb is on the toss list, it is moved to the keep list
526  * before being returned.
527  *
528  * toss_rsb() happens when all local usage of the rsb is done, i.e. no
529  * more refcounts exist, so the rsb is moved from the keep list to the
530  * toss list.
531  *
532  * rsb's on both keep and toss lists are used for doing a name to master
533  * lookups.  rsb's that are in use locally (and being refcounted) are on
534  * the keep list, rsb's that are not in use locally (not refcounted) and
535  * only exist for name/master lookups are on the toss list.
536  *
537  * rsb's on the toss list who's dir_nodeid is not local can have stale
538  * name/master mappings.  So, remote requests on such rsb's can potentially
539  * return with an error, which means the mapping is stale and needs to
540  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
541  * first_lkid is to keep only a single outstanding request on an rsb
542  * while that rsb has a potentially stale master.)
543  */
544
545 static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
546                         uint32_t hash, uint32_t b,
547                         int dir_nodeid, int from_nodeid,
548                         unsigned int flags, struct dlm_rsb **r_ret)
549 {
550         struct dlm_rsb *r = NULL;
551         int our_nodeid = dlm_our_nodeid();
552         int from_local = 0;
553         int from_other = 0;
554         int from_dir = 0;
555         int create = 0;
556         int error;
557
558         if (flags & R_RECEIVE_REQUEST) {
559                 if (from_nodeid == dir_nodeid)
560                         from_dir = 1;
561                 else
562                         from_other = 1;
563         } else if (flags & R_REQUEST) {
564                 from_local = 1;
565         }
566
567         /*
568          * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
569          * from_nodeid has sent us a lock in dlm_recover_locks, believing
570          * we're the new master.  Our local recovery may not have set
571          * res_master_nodeid to our_nodeid yet, so allow either.  Don't
572          * create the rsb; dlm_recover_process_copy() will handle EBADR
573          * by resending.
574          *
575          * If someone sends us a request, we are the dir node, and we do
576          * not find the rsb anywhere, then recreate it.  This happens if
577          * someone sends us a request after we have removed/freed an rsb
578          * from our toss list.  (They sent a request instead of lookup
579          * because they are using an rsb from their toss list.)
580          */
581
582         if (from_local || from_dir ||
583             (from_other && (dir_nodeid == our_nodeid))) {
584                 create = 1;
585         }
586
587  retry:
588         if (create) {
589                 error = pre_rsb_struct(ls);
590                 if (error < 0)
591                         goto out;
592         }
593
594         spin_lock(&ls->ls_rsbtbl[b].lock);
595
596         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
597         if (error)
598                 goto do_toss;
599         
600         /*
601          * rsb is active, so we can't check master_nodeid without lock_rsb.
602          */
603
604         kref_get(&r->res_ref);
605         error = 0;
606         goto out_unlock;
607
608
609  do_toss:
610         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
611         if (error)
612                 goto do_new;
613
614         /*
615          * rsb found inactive (master_nodeid may be out of date unless
616          * we are the dir_nodeid or were the master)  No other thread
617          * is using this rsb because it's on the toss list, so we can
618          * look at or update res_master_nodeid without lock_rsb.
619          */
620
621         if ((r->res_master_nodeid != our_nodeid) && from_other) {
622                 /* our rsb was not master, and another node (not the dir node)
623                    has sent us a request */
624                 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
625                           from_nodeid, r->res_master_nodeid, dir_nodeid,
626                           r->res_name);
627                 error = -ENOTBLK;
628                 goto out_unlock;
629         }
630
631         if ((r->res_master_nodeid != our_nodeid) && from_dir) {
632                 /* don't think this should ever happen */
633                 log_error(ls, "find_rsb toss from_dir %d master %d",
634                           from_nodeid, r->res_master_nodeid);
635                 dlm_print_rsb(r);
636                 /* fix it and go on */
637                 r->res_master_nodeid = our_nodeid;
638                 r->res_nodeid = 0;
639                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
640                 r->res_first_lkid = 0;
641         }
642
643         if (from_local && (r->res_master_nodeid != our_nodeid)) {
644                 /* Because we have held no locks on this rsb,
645                    res_master_nodeid could have become stale. */
646                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
647                 r->res_first_lkid = 0;
648         }
649
650         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
651         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
652         goto out_unlock;
653
654
655  do_new:
656         /*
657          * rsb not found
658          */
659
660         if (error == -EBADR && !create)
661                 goto out_unlock;
662
663         error = get_rsb_struct(ls, name, len, &r);
664         if (error == -EAGAIN) {
665                 spin_unlock(&ls->ls_rsbtbl[b].lock);
666                 goto retry;
667         }
668         if (error)
669                 goto out_unlock;
670
671         r->res_hash = hash;
672         r->res_bucket = b;
673         r->res_dir_nodeid = dir_nodeid;
674         kref_init(&r->res_ref);
675
676         if (from_dir) {
677                 /* want to see how often this happens */
678                 log_debug(ls, "find_rsb new from_dir %d recreate %s",
679                           from_nodeid, r->res_name);
680                 r->res_master_nodeid = our_nodeid;
681                 r->res_nodeid = 0;
682                 goto out_add;
683         }
684
685         if (from_other && (dir_nodeid != our_nodeid)) {
686                 /* should never happen */
687                 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
688                           from_nodeid, dir_nodeid, our_nodeid, r->res_name);
689                 dlm_free_rsb(r);
690                 error = -ENOTBLK;
691                 goto out_unlock;
692         }
693
694         if (from_other) {
695                 log_debug(ls, "find_rsb new from_other %d dir %d %s",
696                           from_nodeid, dir_nodeid, r->res_name);
697         }
698
699         if (dir_nodeid == our_nodeid) {
700                 /* When we are the dir nodeid, we can set the master
701                    node immediately */
702                 r->res_master_nodeid = our_nodeid;
703                 r->res_nodeid = 0;
704         } else {
705                 /* set_master will send_lookup to dir_nodeid */
706                 r->res_master_nodeid = 0;
707                 r->res_nodeid = -1;
708         }
709
710  out_add:
711         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
712  out_unlock:
713         spin_unlock(&ls->ls_rsbtbl[b].lock);
714  out:
715         *r_ret = r;
716         return error;
717 }
718
719 /* During recovery, other nodes can send us new MSTCPY locks (from
720    dlm_recover_locks) before we've made ourself master (in
721    dlm_recover_masters). */
722
723 static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
724                           uint32_t hash, uint32_t b,
725                           int dir_nodeid, int from_nodeid,
726                           unsigned int flags, struct dlm_rsb **r_ret)
727 {
728         struct dlm_rsb *r = NULL;
729         int our_nodeid = dlm_our_nodeid();
730         int recover = (flags & R_RECEIVE_RECOVER);
731         int error;
732
733  retry:
734         error = pre_rsb_struct(ls);
735         if (error < 0)
736                 goto out;
737
738         spin_lock(&ls->ls_rsbtbl[b].lock);
739
740         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
741         if (error)
742                 goto do_toss;
743
744         /*
745          * rsb is active, so we can't check master_nodeid without lock_rsb.
746          */
747
748         kref_get(&r->res_ref);
749         goto out_unlock;
750
751
752  do_toss:
753         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
754         if (error)
755                 goto do_new;
756
757         /*
758          * rsb found inactive. No other thread is using this rsb because
759          * it's on the toss list, so we can look at or update
760          * res_master_nodeid without lock_rsb.
761          */
762
763         if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
764                 /* our rsb is not master, and another node has sent us a
765                    request; this should never happen */
766                 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
767                           from_nodeid, r->res_master_nodeid, dir_nodeid);
768                 dlm_print_rsb(r);
769                 error = -ENOTBLK;
770                 goto out_unlock;
771         }
772
773         if (!recover && (r->res_master_nodeid != our_nodeid) &&
774             (dir_nodeid == our_nodeid)) {
775                 /* our rsb is not master, and we are dir; may as well fix it;
776                    this should never happen */
777                 log_error(ls, "find_rsb toss our %d master %d dir %d",
778                           our_nodeid, r->res_master_nodeid, dir_nodeid);
779                 dlm_print_rsb(r);
780                 r->res_master_nodeid = our_nodeid;
781                 r->res_nodeid = 0;
782         }
783
784         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
785         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
786         goto out_unlock;
787
788
789  do_new:
790         /*
791          * rsb not found
792          */
793
794         error = get_rsb_struct(ls, name, len, &r);
795         if (error == -EAGAIN) {
796                 spin_unlock(&ls->ls_rsbtbl[b].lock);
797                 goto retry;
798         }
799         if (error)
800                 goto out_unlock;
801
802         r->res_hash = hash;
803         r->res_bucket = b;
804         r->res_dir_nodeid = dir_nodeid;
805         r->res_master_nodeid = dir_nodeid;
806         r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
807         kref_init(&r->res_ref);
808
809         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
810  out_unlock:
811         spin_unlock(&ls->ls_rsbtbl[b].lock);
812  out:
813         *r_ret = r;
814         return error;
815 }
816
817 static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
818                     unsigned int flags, struct dlm_rsb **r_ret)
819 {
820         uint32_t hash, b;
821         int dir_nodeid;
822
823         if (len > DLM_RESNAME_MAXLEN)
824                 return -EINVAL;
825
826         hash = jhash(name, len, 0);
827         b = hash & (ls->ls_rsbtbl_size - 1);
828
829         dir_nodeid = dlm_hash2nodeid(ls, hash);
830
831         if (dlm_no_directory(ls))
832                 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
833                                       from_nodeid, flags, r_ret);
834         else
835                 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
836                                       from_nodeid, flags, r_ret);
837 }
838
839 /* we have received a request and found that res_master_nodeid != our_nodeid,
840    so we need to return an error or make ourself the master */
841
842 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
843                                   int from_nodeid)
844 {
845         if (dlm_no_directory(ls)) {
846                 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
847                           from_nodeid, r->res_master_nodeid,
848                           r->res_dir_nodeid);
849                 dlm_print_rsb(r);
850                 return -ENOTBLK;
851         }
852
853         if (from_nodeid != r->res_dir_nodeid) {
854                 /* our rsb is not master, and another node (not the dir node)
855                    has sent us a request.  this is much more common when our
856                    master_nodeid is zero, so limit debug to non-zero.  */
857
858                 if (r->res_master_nodeid) {
859                         log_debug(ls, "validate master from_other %d master %d "
860                                   "dir %d first %x %s", from_nodeid,
861                                   r->res_master_nodeid, r->res_dir_nodeid,
862                                   r->res_first_lkid, r->res_name);
863                 }
864                 return -ENOTBLK;
865         } else {
866                 /* our rsb is not master, but the dir nodeid has sent us a
867                    request; this could happen with master 0 / res_nodeid -1 */
868
869                 if (r->res_master_nodeid) {
870                         log_error(ls, "validate master from_dir %d master %d "
871                                   "first %x %s",
872                                   from_nodeid, r->res_master_nodeid,
873                                   r->res_first_lkid, r->res_name);
874                 }
875
876                 r->res_master_nodeid = dlm_our_nodeid();
877                 r->res_nodeid = 0;
878                 return 0;
879         }
880 }
881
882 /*
883  * We're the dir node for this res and another node wants to know the
884  * master nodeid.  During normal operation (non recovery) this is only
885  * called from receive_lookup(); master lookups when the local node is
886  * the dir node are done by find_rsb().
887  *
888  * normal operation, we are the dir node for a resource
889  * . _request_lock
890  * . set_master
891  * . send_lookup
892  * . receive_lookup
893  * . dlm_master_lookup flags 0
894  *
895  * recover directory, we are rebuilding dir for all resources
896  * . dlm_recover_directory
897  * . dlm_rcom_names
898  *   remote node sends back the rsb names it is master of and we are dir of
899  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
900  *   we either create new rsb setting remote node as master, or find existing
901  *   rsb and set master to be the remote node.
902  *
903  * recover masters, we are finding the new master for resources
904  * . dlm_recover_masters
905  * . recover_master
906  * . dlm_send_rcom_lookup
907  * . receive_rcom_lookup
908  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
909  */
910
911 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
912                       unsigned int flags, int *r_nodeid, int *result)
913 {
914         struct dlm_rsb *r = NULL;
915         uint32_t hash, b;
916         int from_master = (flags & DLM_LU_RECOVER_DIR);
917         int fix_master = (flags & DLM_LU_RECOVER_MASTER);
918         int our_nodeid = dlm_our_nodeid();
919         int dir_nodeid, error, toss_list = 0;
920
921         if (len > DLM_RESNAME_MAXLEN)
922                 return -EINVAL;
923
924         if (from_nodeid == our_nodeid) {
925                 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
926                           our_nodeid, flags);
927                 return -EINVAL;
928         }
929
930         hash = jhash(name, len, 0);
931         b = hash & (ls->ls_rsbtbl_size - 1);
932
933         dir_nodeid = dlm_hash2nodeid(ls, hash);
934         if (dir_nodeid != our_nodeid) {
935                 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
936                           from_nodeid, dir_nodeid, our_nodeid, hash,
937                           ls->ls_num_nodes);
938                 *r_nodeid = -1;
939                 return -EINVAL;
940         }
941
942  retry:
943         error = pre_rsb_struct(ls);
944         if (error < 0)
945                 return error;
946
947         spin_lock(&ls->ls_rsbtbl[b].lock);
948         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
949         if (!error) {
950                 /* because the rsb is active, we need to lock_rsb before
951                    checking/changing re_master_nodeid */
952
953                 hold_rsb(r);
954                 spin_unlock(&ls->ls_rsbtbl[b].lock);
955                 lock_rsb(r);
956                 goto found;
957         }
958
959         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
960         if (error)
961                 goto not_found;
962
963         /* because the rsb is inactive (on toss list), it's not refcounted
964            and lock_rsb is not used, but is protected by the rsbtbl lock */
965
966         toss_list = 1;
967  found:
968         if (r->res_dir_nodeid != our_nodeid) {
969                 /* should not happen, but may as well fix it and carry on */
970                 log_error(ls, "dlm_master_lookup res_dir %d our %d %s",
971                           r->res_dir_nodeid, our_nodeid, r->res_name);
972                 r->res_dir_nodeid = our_nodeid;
973         }
974
975         if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
976                 /* Recovery uses this function to set a new master when
977                    the previous master failed.  Setting NEW_MASTER will
978                    force dlm_recover_masters to call recover_master on this
979                    rsb even though the res_nodeid is no longer removed. */
980
981                 r->res_master_nodeid = from_nodeid;
982                 r->res_nodeid = from_nodeid;
983                 rsb_set_flag(r, RSB_NEW_MASTER);
984
985                 if (toss_list) {
986                         /* I don't think we should ever find it on toss list. */
987                         log_error(ls, "dlm_master_lookup fix_master on toss");
988                         dlm_dump_rsb(r);
989                 }
990         }
991
992         if (from_master && (r->res_master_nodeid != from_nodeid)) {
993                 /* this will happen if from_nodeid became master during
994                    a previous recovery cycle, and we aborted the previous
995                    cycle before recovering this master value */
996
997                 log_limit(ls, "dlm_master_lookup from_master %d "
998                           "master_nodeid %d res_nodeid %d first %x %s",
999                           from_nodeid, r->res_master_nodeid, r->res_nodeid,
1000                           r->res_first_lkid, r->res_name);
1001
1002                 if (r->res_master_nodeid == our_nodeid) {
1003                         log_error(ls, "from_master %d our_master", from_nodeid);
1004                         dlm_dump_rsb(r);
1005                         dlm_send_rcom_lookup_dump(r, from_nodeid);
1006                         goto out_found;
1007                 }
1008
1009                 r->res_master_nodeid = from_nodeid;
1010                 r->res_nodeid = from_nodeid;
1011                 rsb_set_flag(r, RSB_NEW_MASTER);
1012         }
1013
1014         if (!r->res_master_nodeid) {
1015                 /* this will happen if recovery happens while we're looking
1016                    up the master for this rsb */
1017
1018                 log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s",
1019                           from_nodeid, r->res_first_lkid, r->res_name);
1020                 r->res_master_nodeid = from_nodeid;
1021                 r->res_nodeid = from_nodeid;
1022         }
1023
1024         if (!from_master && !fix_master &&
1025             (r->res_master_nodeid == from_nodeid)) {
1026                 /* this can happen when the master sends remove, the dir node
1027                    finds the rsb on the keep list and ignores the remove,
1028                    and the former master sends a lookup */
1029
1030                 log_limit(ls, "dlm_master_lookup from master %d flags %x "
1031                           "first %x %s", from_nodeid, flags,
1032                           r->res_first_lkid, r->res_name);
1033         }
1034
1035  out_found:
1036         *r_nodeid = r->res_master_nodeid;
1037         if (result)
1038                 *result = DLM_LU_MATCH;
1039
1040         if (toss_list) {
1041                 r->res_toss_time = jiffies;
1042                 /* the rsb was inactive (on toss list) */
1043                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1044         } else {
1045                 /* the rsb was active */
1046                 unlock_rsb(r);
1047                 put_rsb(r);
1048         }
1049         return 0;
1050
1051  not_found:
1052         error = get_rsb_struct(ls, name, len, &r);
1053         if (error == -EAGAIN) {
1054                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1055                 goto retry;
1056         }
1057         if (error)
1058                 goto out_unlock;
1059
1060         r->res_hash = hash;
1061         r->res_bucket = b;
1062         r->res_dir_nodeid = our_nodeid;
1063         r->res_master_nodeid = from_nodeid;
1064         r->res_nodeid = from_nodeid;
1065         kref_init(&r->res_ref);
1066         r->res_toss_time = jiffies;
1067
1068         error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1069         if (error) {
1070                 /* should never happen */
1071                 dlm_free_rsb(r);
1072                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1073                 goto retry;
1074         }
1075
1076         if (result)
1077                 *result = DLM_LU_ADD;
1078         *r_nodeid = from_nodeid;
1079         error = 0;
1080  out_unlock:
1081         spin_unlock(&ls->ls_rsbtbl[b].lock);
1082         return error;
1083 }
1084
1085 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1086 {
1087         struct rb_node *n;
1088         struct dlm_rsb *r;
1089         int i;
1090
1091         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1092                 spin_lock(&ls->ls_rsbtbl[i].lock);
1093                 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1094                         r = rb_entry(n, struct dlm_rsb, res_hashnode);
1095                         if (r->res_hash == hash)
1096                                 dlm_dump_rsb(r);
1097                 }
1098                 spin_unlock(&ls->ls_rsbtbl[i].lock);
1099         }
1100 }
1101
1102 void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
1103 {
1104         struct dlm_rsb *r = NULL;
1105         uint32_t hash, b;
1106         int error;
1107
1108         hash = jhash(name, len, 0);
1109         b = hash & (ls->ls_rsbtbl_size - 1);
1110
1111         spin_lock(&ls->ls_rsbtbl[b].lock);
1112         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1113         if (!error)
1114                 goto out_dump;
1115
1116         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1117         if (error)
1118                 goto out;
1119  out_dump:
1120         dlm_dump_rsb(r);
1121  out:
1122         spin_unlock(&ls->ls_rsbtbl[b].lock);
1123 }
1124
1125 static void toss_rsb(struct kref *kref)
1126 {
1127         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1128         struct dlm_ls *ls = r->res_ls;
1129
1130         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1131         kref_init(&r->res_ref);
1132         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1133         rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1134         r->res_toss_time = jiffies;
1135         if (r->res_lvbptr) {
1136                 dlm_free_lvb(r->res_lvbptr);
1137                 r->res_lvbptr = NULL;
1138         }
1139 }
1140
1141 /* See comment for unhold_lkb */
1142
1143 static void unhold_rsb(struct dlm_rsb *r)
1144 {
1145         int rv;
1146         rv = kref_put(&r->res_ref, toss_rsb);
1147         DLM_ASSERT(!rv, dlm_dump_rsb(r););
1148 }
1149
1150 static void kill_rsb(struct kref *kref)
1151 {
1152         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1153
1154         /* All work is done after the return from kref_put() so we
1155            can release the write_lock before the remove and free. */
1156
1157         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1158         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1159         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1160         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1161         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1162         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1163 }
1164
1165 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1166    The rsb must exist as long as any lkb's for it do. */
1167
1168 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1169 {
1170         hold_rsb(r);
1171         lkb->lkb_resource = r;
1172 }
1173
1174 static void detach_lkb(struct dlm_lkb *lkb)
1175 {
1176         if (lkb->lkb_resource) {
1177                 put_rsb(lkb->lkb_resource);
1178                 lkb->lkb_resource = NULL;
1179         }
1180 }
1181
1182 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1183 {
1184         struct dlm_lkb *lkb;
1185         int rv, id;
1186
1187         lkb = dlm_allocate_lkb(ls);
1188         if (!lkb)
1189                 return -ENOMEM;
1190
1191         lkb->lkb_nodeid = -1;
1192         lkb->lkb_grmode = DLM_LOCK_IV;
1193         kref_init(&lkb->lkb_ref);
1194         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1195         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1196         INIT_LIST_HEAD(&lkb->lkb_time_list);
1197         INIT_LIST_HEAD(&lkb->lkb_cb_list);
1198         mutex_init(&lkb->lkb_cb_mutex);
1199         INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1200
1201  retry:
1202         rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
1203         if (!rv)
1204                 return -ENOMEM;
1205
1206         spin_lock(&ls->ls_lkbidr_spin);
1207         rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
1208         if (!rv)
1209                 lkb->lkb_id = id;
1210         spin_unlock(&ls->ls_lkbidr_spin);
1211
1212         if (rv == -EAGAIN)
1213                 goto retry;
1214
1215         if (rv < 0) {
1216                 log_error(ls, "create_lkb idr error %d", rv);
1217                 return rv;
1218         }
1219
1220         *lkb_ret = lkb;
1221         return 0;
1222 }
1223
1224 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1225 {
1226         struct dlm_lkb *lkb;
1227
1228         spin_lock(&ls->ls_lkbidr_spin);
1229         lkb = idr_find(&ls->ls_lkbidr, lkid);
1230         if (lkb)
1231                 kref_get(&lkb->lkb_ref);
1232         spin_unlock(&ls->ls_lkbidr_spin);
1233
1234         *lkb_ret = lkb;
1235         return lkb ? 0 : -ENOENT;
1236 }
1237
1238 static void kill_lkb(struct kref *kref)
1239 {
1240         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1241
1242         /* All work is done after the return from kref_put() so we
1243            can release the write_lock before the detach_lkb */
1244
1245         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1246 }
1247
1248 /* __put_lkb() is used when an lkb may not have an rsb attached to
1249    it so we need to provide the lockspace explicitly */
1250
1251 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1252 {
1253         uint32_t lkid = lkb->lkb_id;
1254
1255         spin_lock(&ls->ls_lkbidr_spin);
1256         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
1257                 idr_remove(&ls->ls_lkbidr, lkid);
1258                 spin_unlock(&ls->ls_lkbidr_spin);
1259
1260                 detach_lkb(lkb);
1261
1262                 /* for local/process lkbs, lvbptr points to caller's lksb */
1263                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1264                         dlm_free_lvb(lkb->lkb_lvbptr);
1265                 dlm_free_lkb(lkb);
1266                 return 1;
1267         } else {
1268                 spin_unlock(&ls->ls_lkbidr_spin);
1269                 return 0;
1270         }
1271 }
1272
1273 int dlm_put_lkb(struct dlm_lkb *lkb)
1274 {
1275         struct dlm_ls *ls;
1276
1277         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1278         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1279
1280         ls = lkb->lkb_resource->res_ls;
1281         return __put_lkb(ls, lkb);
1282 }
1283
1284 /* This is only called to add a reference when the code already holds
1285    a valid reference to the lkb, so there's no need for locking. */
1286
1287 static inline void hold_lkb(struct dlm_lkb *lkb)
1288 {
1289         kref_get(&lkb->lkb_ref);
1290 }
1291
1292 /* This is called when we need to remove a reference and are certain
1293    it's not the last ref.  e.g. del_lkb is always called between a
1294    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1295    put_lkb would work fine, but would involve unnecessary locking */
1296
1297 static inline void unhold_lkb(struct dlm_lkb *lkb)
1298 {
1299         int rv;
1300         rv = kref_put(&lkb->lkb_ref, kill_lkb);
1301         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
1302 }
1303
1304 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1305                             int mode)
1306 {
1307         struct dlm_lkb *lkb = NULL;
1308
1309         list_for_each_entry(lkb, head, lkb_statequeue)
1310                 if (lkb->lkb_rqmode < mode)
1311                         break;
1312
1313         __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
1314 }
1315
1316 /* add/remove lkb to rsb's grant/convert/wait queue */
1317
1318 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1319 {
1320         kref_get(&lkb->lkb_ref);
1321
1322         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1323
1324         lkb->lkb_timestamp = ktime_get();
1325
1326         lkb->lkb_status = status;
1327
1328         switch (status) {
1329         case DLM_LKSTS_WAITING:
1330                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1331                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1332                 else
1333                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1334                 break;
1335         case DLM_LKSTS_GRANTED:
1336                 /* convention says granted locks kept in order of grmode */
1337                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1338                                 lkb->lkb_grmode);
1339                 break;
1340         case DLM_LKSTS_CONVERT:
1341                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1342                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1343                 else
1344                         list_add_tail(&lkb->lkb_statequeue,
1345                                       &r->res_convertqueue);
1346                 break;
1347         default:
1348                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1349         }
1350 }
1351
1352 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1353 {
1354         lkb->lkb_status = 0;
1355         list_del(&lkb->lkb_statequeue);
1356         unhold_lkb(lkb);
1357 }
1358
1359 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1360 {
1361         hold_lkb(lkb);
1362         del_lkb(r, lkb);
1363         add_lkb(r, lkb, sts);
1364         unhold_lkb(lkb);
1365 }
1366
1367 static int msg_reply_type(int mstype)
1368 {
1369         switch (mstype) {
1370         case DLM_MSG_REQUEST:
1371                 return DLM_MSG_REQUEST_REPLY;
1372         case DLM_MSG_CONVERT:
1373                 return DLM_MSG_CONVERT_REPLY;
1374         case DLM_MSG_UNLOCK:
1375                 return DLM_MSG_UNLOCK_REPLY;
1376         case DLM_MSG_CANCEL:
1377                 return DLM_MSG_CANCEL_REPLY;
1378         case DLM_MSG_LOOKUP:
1379                 return DLM_MSG_LOOKUP_REPLY;
1380         }
1381         return -1;
1382 }
1383
1384 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
1385 {
1386         int i;
1387
1388         for (i = 0; i < num_nodes; i++) {
1389                 if (!warned[i]) {
1390                         warned[i] = nodeid;
1391                         return 0;
1392                 }
1393                 if (warned[i] == nodeid)
1394                         return 1;
1395         }
1396         return 0;
1397 }
1398
1399 void dlm_scan_waiters(struct dlm_ls *ls)
1400 {
1401         struct dlm_lkb *lkb;
1402         ktime_t zero = ktime_set(0, 0);
1403         s64 us;
1404         s64 debug_maxus = 0;
1405         u32 debug_scanned = 0;
1406         u32 debug_expired = 0;
1407         int num_nodes = 0;
1408         int *warned = NULL;
1409
1410         if (!dlm_config.ci_waitwarn_us)
1411                 return;
1412
1413         mutex_lock(&ls->ls_waiters_mutex);
1414
1415         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1416                 if (ktime_equal(lkb->lkb_wait_time, zero))
1417                         continue;
1418
1419                 debug_scanned++;
1420
1421                 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
1422
1423                 if (us < dlm_config.ci_waitwarn_us)
1424                         continue;
1425
1426                 lkb->lkb_wait_time = zero;
1427
1428                 debug_expired++;
1429                 if (us > debug_maxus)
1430                         debug_maxus = us;
1431
1432                 if (!num_nodes) {
1433                         num_nodes = ls->ls_num_nodes;
1434                         warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
1435                 }
1436                 if (!warned)
1437                         continue;
1438                 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
1439                         continue;
1440
1441                 log_error(ls, "waitwarn %x %lld %d us check connection to "
1442                           "node %d", lkb->lkb_id, (long long)us,
1443                           dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
1444         }
1445         mutex_unlock(&ls->ls_waiters_mutex);
1446         kfree(warned);
1447
1448         if (debug_expired)
1449                 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
1450                           debug_scanned, debug_expired,
1451                           dlm_config.ci_waitwarn_us, (long long)debug_maxus);
1452 }
1453
1454 /* add/remove lkb from global waiters list of lkb's waiting for
1455    a reply from a remote node */
1456
1457 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1458 {
1459         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1460         int error = 0;
1461
1462         mutex_lock(&ls->ls_waiters_mutex);
1463
1464         if (is_overlap_unlock(lkb) ||
1465             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1466                 error = -EINVAL;
1467                 goto out;
1468         }
1469
1470         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1471                 switch (mstype) {
1472                 case DLM_MSG_UNLOCK:
1473                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1474                         break;
1475                 case DLM_MSG_CANCEL:
1476                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1477                         break;
1478                 default:
1479                         error = -EBUSY;
1480                         goto out;
1481                 }
1482                 lkb->lkb_wait_count++;
1483                 hold_lkb(lkb);
1484
1485                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1486                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
1487                           lkb->lkb_wait_count, lkb->lkb_flags);
1488                 goto out;
1489         }
1490
1491         DLM_ASSERT(!lkb->lkb_wait_count,
1492                    dlm_print_lkb(lkb);
1493                    printk("wait_count %d\n", lkb->lkb_wait_count););
1494
1495         lkb->lkb_wait_count++;
1496         lkb->lkb_wait_type = mstype;
1497         lkb->lkb_wait_time = ktime_get();
1498         lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1499         hold_lkb(lkb);
1500         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1501  out:
1502         if (error)
1503                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
1504                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
1505                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1506         mutex_unlock(&ls->ls_waiters_mutex);
1507         return error;
1508 }
1509
1510 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1511    list as part of process_requestqueue (e.g. a lookup that has an optimized
1512    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1513    set RESEND and dlm_recover_waiters_post() */
1514
1515 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1516                                 struct dlm_message *ms)
1517 {
1518         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1519         int overlap_done = 0;
1520
1521         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1522                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1523                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1524                 overlap_done = 1;
1525                 goto out_del;
1526         }
1527
1528         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1529                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1530                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1531                 overlap_done = 1;
1532                 goto out_del;
1533         }
1534
1535         /* Cancel state was preemptively cleared by a successful convert,
1536            see next comment, nothing to do. */
1537
1538         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1539             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1540                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1541                           lkb->lkb_id, lkb->lkb_wait_type);
1542                 return -1;
1543         }
1544
1545         /* Remove for the convert reply, and premptively remove for the
1546            cancel reply.  A convert has been granted while there's still
1547            an outstanding cancel on it (the cancel is moot and the result
1548            in the cancel reply should be 0).  We preempt the cancel reply
1549            because the app gets the convert result and then can follow up
1550            with another op, like convert.  This subsequent op would see the
1551            lingering state of the cancel and fail with -EBUSY. */
1552
1553         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1554             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1555             is_overlap_cancel(lkb) && ms && !ms->m_result) {
1556                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1557                           lkb->lkb_id);
1558                 lkb->lkb_wait_type = 0;
1559                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1560                 lkb->lkb_wait_count--;
1561                 goto out_del;
1562         }
1563
1564         /* N.B. type of reply may not always correspond to type of original
1565            msg due to lookup->request optimization, verify others? */
1566
1567         if (lkb->lkb_wait_type) {
1568                 lkb->lkb_wait_type = 0;
1569                 goto out_del;
1570         }
1571
1572         log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1573                   lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
1574                   mstype, lkb->lkb_flags);
1575         return -1;
1576
1577  out_del:
1578         /* the force-unlock/cancel has completed and we haven't recvd a reply
1579            to the op that was in progress prior to the unlock/cancel; we
1580            give up on any reply to the earlier op.  FIXME: not sure when/how
1581            this would happen */
1582
1583         if (overlap_done && lkb->lkb_wait_type) {
1584                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1585                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
1586                 lkb->lkb_wait_count--;
1587                 lkb->lkb_wait_type = 0;
1588         }
1589
1590         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1591
1592         lkb->lkb_flags &= ~DLM_IFL_RESEND;
1593         lkb->lkb_wait_count--;
1594         if (!lkb->lkb_wait_count)
1595                 list_del_init(&lkb->lkb_wait_reply);
1596         unhold_lkb(lkb);
1597         return 0;
1598 }
1599
1600 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1601 {
1602         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1603         int error;
1604
1605         mutex_lock(&ls->ls_waiters_mutex);
1606         error = _remove_from_waiters(lkb, mstype, NULL);
1607         mutex_unlock(&ls->ls_waiters_mutex);
1608         return error;
1609 }
1610
1611 /* Handles situations where we might be processing a "fake" or "stub" reply in
1612    which we can't try to take waiters_mutex again. */
1613
1614 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1615 {
1616         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1617         int error;
1618
1619         if (ms->m_flags != DLM_IFL_STUB_MS)
1620                 mutex_lock(&ls->ls_waiters_mutex);
1621         error = _remove_from_waiters(lkb, ms->m_type, ms);
1622         if (ms->m_flags != DLM_IFL_STUB_MS)
1623                 mutex_unlock(&ls->ls_waiters_mutex);
1624         return error;
1625 }
1626
1627 /* If there's an rsb for the same resource being removed, ensure
1628    that the remove message is sent before the new lookup message.
1629    It should be rare to need a delay here, but if not, then it may
1630    be worthwhile to add a proper wait mechanism rather than a delay. */
1631
1632 static void wait_pending_remove(struct dlm_rsb *r)
1633 {
1634         struct dlm_ls *ls = r->res_ls;
1635  restart:
1636         spin_lock(&ls->ls_remove_spin);
1637         if (ls->ls_remove_len &&
1638             !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) {
1639                 log_debug(ls, "delay lookup for remove dir %d %s",
1640                           r->res_dir_nodeid, r->res_name);
1641                 spin_unlock(&ls->ls_remove_spin);
1642                 msleep(1);
1643                 goto restart;
1644         }
1645         spin_unlock(&ls->ls_remove_spin);
1646 }
1647
1648 /*
1649  * ls_remove_spin protects ls_remove_name and ls_remove_len which are
1650  * read by other threads in wait_pending_remove.  ls_remove_names
1651  * and ls_remove_lens are only used by the scan thread, so they do
1652  * not need protection.
1653  */
1654
1655 static void shrink_bucket(struct dlm_ls *ls, int b)
1656 {
1657         struct rb_node *n, *next;
1658         struct dlm_rsb *r;
1659         char *name;
1660         int our_nodeid = dlm_our_nodeid();
1661         int remote_count = 0;
1662         int i, len, rv;
1663
1664         memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1665
1666         spin_lock(&ls->ls_rsbtbl[b].lock);
1667         for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1668                 next = rb_next(n);
1669                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1670
1671                 /* If we're the directory record for this rsb, and
1672                    we're not the master of it, then we need to wait
1673                    for the master node to send us a dir remove for
1674                    before removing the dir record. */
1675
1676                 if (!dlm_no_directory(ls) &&
1677                     (r->res_master_nodeid != our_nodeid) &&
1678                     (dlm_dir_nodeid(r) == our_nodeid)) {
1679                         continue;
1680                 }
1681
1682                 if (!time_after_eq(jiffies, r->res_toss_time +
1683                                    dlm_config.ci_toss_secs * HZ)) {
1684                         continue;
1685                 }
1686
1687                 if (!dlm_no_directory(ls) &&
1688                     (r->res_master_nodeid == our_nodeid) &&
1689                     (dlm_dir_nodeid(r) != our_nodeid)) {
1690
1691                         /* We're the master of this rsb but we're not
1692                            the directory record, so we need to tell the
1693                            dir node to remove the dir record. */
1694
1695                         ls->ls_remove_lens[remote_count] = r->res_length;
1696                         memcpy(ls->ls_remove_names[remote_count], r->res_name,
1697                                DLM_RESNAME_MAXLEN);
1698                         remote_count++;
1699
1700                         if (remote_count >= DLM_REMOVE_NAMES_MAX)
1701                                 break;
1702                         continue;
1703                 }
1704
1705                 if (!kref_put(&r->res_ref, kill_rsb)) {
1706                         log_error(ls, "tossed rsb in use %s", r->res_name);
1707                         continue;
1708                 }
1709
1710                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1711                 dlm_free_rsb(r);
1712         }
1713         spin_unlock(&ls->ls_rsbtbl[b].lock);
1714
1715         /*
1716          * While searching for rsb's to free, we found some that require
1717          * remote removal.  We leave them in place and find them again here
1718          * so there is a very small gap between removing them from the toss
1719          * list and sending the removal.  Keeping this gap small is
1720          * important to keep us (the master node) from being out of sync
1721          * with the remote dir node for very long.
1722          *
1723          * From the time the rsb is removed from toss until just after
1724          * send_remove, the rsb name is saved in ls_remove_name.  A new
1725          * lookup checks this to ensure that a new lookup message for the
1726          * same resource name is not sent just before the remove message.
1727          */
1728
1729         for (i = 0; i < remote_count; i++) {
1730                 name = ls->ls_remove_names[i];
1731                 len = ls->ls_remove_lens[i];
1732
1733                 spin_lock(&ls->ls_rsbtbl[b].lock);
1734                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1735                 if (rv) {
1736                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1737                         log_debug(ls, "remove_name not toss %s", name);
1738                         continue;
1739                 }
1740
1741                 if (r->res_master_nodeid != our_nodeid) {
1742                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1743                         log_debug(ls, "remove_name master %d dir %d our %d %s",
1744                                   r->res_master_nodeid, r->res_dir_nodeid,
1745                                   our_nodeid, name);
1746                         continue;
1747                 }
1748
1749                 if (r->res_dir_nodeid == our_nodeid) {
1750                         /* should never happen */
1751                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1752                         log_error(ls, "remove_name dir %d master %d our %d %s",
1753                                   r->res_dir_nodeid, r->res_master_nodeid,
1754                                   our_nodeid, name);
1755                         continue;
1756                 }
1757
1758                 if (!time_after_eq(jiffies, r->res_toss_time +
1759                                    dlm_config.ci_toss_secs * HZ)) {
1760                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1761                         log_debug(ls, "remove_name toss_time %lu now %lu %s",
1762                                   r->res_toss_time, jiffies, name);
1763                         continue;
1764                 }
1765
1766                 if (!kref_put(&r->res_ref, kill_rsb)) {
1767                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1768                         log_error(ls, "remove_name in use %s", name);
1769                         continue;
1770                 }
1771
1772                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1773
1774                 /* block lookup of same name until we've sent remove */
1775                 spin_lock(&ls->ls_remove_spin);
1776                 ls->ls_remove_len = len;
1777                 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
1778                 spin_unlock(&ls->ls_remove_spin);
1779                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1780
1781                 send_remove(r);
1782
1783                 /* allow lookup of name again */
1784                 spin_lock(&ls->ls_remove_spin);
1785                 ls->ls_remove_len = 0;
1786                 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
1787                 spin_unlock(&ls->ls_remove_spin);
1788
1789                 dlm_free_rsb(r);
1790         }
1791 }
1792
1793 void dlm_scan_rsbs(struct dlm_ls *ls)
1794 {
1795         int i;
1796
1797         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1798                 shrink_bucket(ls, i);
1799                 if (dlm_locking_stopped(ls))
1800                         break;
1801                 cond_resched();
1802         }
1803 }
1804
1805 static void add_timeout(struct dlm_lkb *lkb)
1806 {
1807         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1808
1809         if (is_master_copy(lkb))
1810                 return;
1811
1812         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1813             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1814                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1815                 goto add_it;
1816         }
1817         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1818                 goto add_it;
1819         return;
1820
1821  add_it:
1822         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1823         mutex_lock(&ls->ls_timeout_mutex);
1824         hold_lkb(lkb);
1825         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1826         mutex_unlock(&ls->ls_timeout_mutex);
1827 }
1828
1829 static void del_timeout(struct dlm_lkb *lkb)
1830 {
1831         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1832
1833         mutex_lock(&ls->ls_timeout_mutex);
1834         if (!list_empty(&lkb->lkb_time_list)) {
1835                 list_del_init(&lkb->lkb_time_list);
1836                 unhold_lkb(lkb);
1837         }
1838         mutex_unlock(&ls->ls_timeout_mutex);
1839 }
1840
1841 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1842    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1843    and then lock rsb because of lock ordering in add_timeout.  We may need
1844    to specify some special timeout-related bits in the lkb that are just to
1845    be accessed under the timeout_mutex. */
1846
1847 void dlm_scan_timeout(struct dlm_ls *ls)
1848 {
1849         struct dlm_rsb *r;
1850         struct dlm_lkb *lkb;
1851         int do_cancel, do_warn;
1852         s64 wait_us;
1853
1854         for (;;) {
1855                 if (dlm_locking_stopped(ls))
1856                         break;
1857
1858                 do_cancel = 0;
1859                 do_warn = 0;
1860                 mutex_lock(&ls->ls_timeout_mutex);
1861                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1862
1863                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1864                                                         lkb->lkb_timestamp));
1865
1866                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1867                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1868                                 do_cancel = 1;
1869
1870                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1871                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1872                                 do_warn = 1;
1873
1874                         if (!do_cancel && !do_warn)
1875                                 continue;
1876                         hold_lkb(lkb);
1877                         break;
1878                 }
1879                 mutex_unlock(&ls->ls_timeout_mutex);
1880
1881                 if (!do_cancel && !do_warn)
1882                         break;
1883
1884                 r = lkb->lkb_resource;
1885                 hold_rsb(r);
1886                 lock_rsb(r);
1887
1888                 if (do_warn) {
1889                         /* clear flag so we only warn once */
1890                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1891                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1892                                 del_timeout(lkb);
1893                         dlm_timeout_warn(lkb);
1894                 }
1895
1896                 if (do_cancel) {
1897                         log_debug(ls, "timeout cancel %x node %d %s",
1898                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1899                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1900                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1901                         del_timeout(lkb);
1902                         _cancel_lock(r, lkb);
1903                 }
1904
1905                 unlock_rsb(r);
1906                 unhold_rsb(r);
1907                 dlm_put_lkb(lkb);
1908         }
1909 }
1910
1911 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1912    dlm_recoverd before checking/setting ls_recover_begin. */
1913
1914 void dlm_adjust_timeouts(struct dlm_ls *ls)
1915 {
1916         struct dlm_lkb *lkb;
1917         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1918
1919         ls->ls_recover_begin = 0;
1920         mutex_lock(&ls->ls_timeout_mutex);
1921         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1922                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1923         mutex_unlock(&ls->ls_timeout_mutex);
1924
1925         if (!dlm_config.ci_waitwarn_us)
1926                 return;
1927
1928         mutex_lock(&ls->ls_waiters_mutex);
1929         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1930                 if (ktime_to_us(lkb->lkb_wait_time))
1931                         lkb->lkb_wait_time = ktime_get();
1932         }
1933         mutex_unlock(&ls->ls_waiters_mutex);
1934 }
1935
1936 /* lkb is master or local copy */
1937
1938 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1939 {
1940         int b, len = r->res_ls->ls_lvblen;
1941
1942         /* b=1 lvb returned to caller
1943            b=0 lvb written to rsb or invalidated
1944            b=-1 do nothing */
1945
1946         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1947
1948         if (b == 1) {
1949                 if (!lkb->lkb_lvbptr)
1950                         return;
1951
1952                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1953                         return;
1954
1955                 if (!r->res_lvbptr)
1956                         return;
1957
1958                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1959                 lkb->lkb_lvbseq = r->res_lvbseq;
1960
1961         } else if (b == 0) {
1962                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1963                         rsb_set_flag(r, RSB_VALNOTVALID);
1964                         return;
1965                 }
1966
1967                 if (!lkb->lkb_lvbptr)
1968                         return;
1969
1970                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1971                         return;
1972
1973                 if (!r->res_lvbptr)
1974                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1975
1976                 if (!r->res_lvbptr)
1977                         return;
1978
1979                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1980                 r->res_lvbseq++;
1981                 lkb->lkb_lvbseq = r->res_lvbseq;
1982                 rsb_clear_flag(r, RSB_VALNOTVALID);
1983         }
1984
1985         if (rsb_flag(r, RSB_VALNOTVALID))
1986                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1987 }
1988
1989 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1990 {
1991         if (lkb->lkb_grmode < DLM_LOCK_PW)
1992                 return;
1993
1994         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1995                 rsb_set_flag(r, RSB_VALNOTVALID);
1996                 return;
1997         }
1998
1999         if (!lkb->lkb_lvbptr)
2000                 return;
2001
2002         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2003                 return;
2004
2005         if (!r->res_lvbptr)
2006                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
2007
2008         if (!r->res_lvbptr)
2009                 return;
2010
2011         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2012         r->res_lvbseq++;
2013         rsb_clear_flag(r, RSB_VALNOTVALID);
2014 }
2015
2016 /* lkb is process copy (pc) */
2017
2018 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2019                             struct dlm_message *ms)
2020 {
2021         int b;
2022
2023         if (!lkb->lkb_lvbptr)
2024                 return;
2025
2026         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2027                 return;
2028
2029         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
2030         if (b == 1) {
2031                 int len = receive_extralen(ms);
2032                 if (len > DLM_RESNAME_MAXLEN)
2033                         len = DLM_RESNAME_MAXLEN;
2034                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2035                 lkb->lkb_lvbseq = ms->m_lvbseq;
2036         }
2037 }
2038
2039 /* Manipulate lkb's on rsb's convert/granted/waiting queues
2040    remove_lock -- used for unlock, removes lkb from granted
2041    revert_lock -- used for cancel, moves lkb from convert to granted
2042    grant_lock  -- used for request and convert, adds lkb to granted or
2043                   moves lkb from convert or waiting to granted
2044
2045    Each of these is used for master or local copy lkb's.  There is
2046    also a _pc() variation used to make the corresponding change on
2047    a process copy (pc) lkb. */
2048
2049 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2050 {
2051         del_lkb(r, lkb);
2052         lkb->lkb_grmode = DLM_LOCK_IV;
2053         /* this unhold undoes the original ref from create_lkb()
2054            so this leads to the lkb being freed */
2055         unhold_lkb(lkb);
2056 }
2057
2058 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2059 {
2060         set_lvb_unlock(r, lkb);
2061         _remove_lock(r, lkb);
2062 }
2063
2064 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2065 {
2066         _remove_lock(r, lkb);
2067 }
2068
2069 /* returns: 0 did nothing
2070             1 moved lock to granted
2071            -1 removed lock */
2072
2073 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2074 {
2075         int rv = 0;
2076
2077         lkb->lkb_rqmode = DLM_LOCK_IV;
2078
2079         switch (lkb->lkb_status) {
2080         case DLM_LKSTS_GRANTED:
2081                 break;
2082         case DLM_LKSTS_CONVERT:
2083                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2084                 rv = 1;
2085                 break;
2086         case DLM_LKSTS_WAITING:
2087                 del_lkb(r, lkb);
2088                 lkb->lkb_grmode = DLM_LOCK_IV;
2089                 /* this unhold undoes the original ref from create_lkb()
2090                    so this leads to the lkb being freed */
2091                 unhold_lkb(lkb);
2092                 rv = -1;
2093                 break;
2094         default:
2095                 log_print("invalid status for revert %d", lkb->lkb_status);
2096         }
2097         return rv;
2098 }
2099
2100 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2101 {
2102         return revert_lock(r, lkb);
2103 }
2104
2105 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2106 {
2107         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2108                 lkb->lkb_grmode = lkb->lkb_rqmode;
2109                 if (lkb->lkb_status)
2110                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2111                 else
2112                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2113         }
2114
2115         lkb->lkb_rqmode = DLM_LOCK_IV;
2116         lkb->lkb_highbast = 0;
2117 }
2118
2119 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2120 {
2121         set_lvb_lock(r, lkb);
2122         _grant_lock(r, lkb);
2123 }
2124
2125 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2126                           struct dlm_message *ms)
2127 {
2128         set_lvb_lock_pc(r, lkb, ms);
2129         _grant_lock(r, lkb);
2130 }
2131
2132 /* called by grant_pending_locks() which means an async grant message must
2133    be sent to the requesting node in addition to granting the lock if the
2134    lkb belongs to a remote node. */
2135
2136 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2137 {
2138         grant_lock(r, lkb);
2139         if (is_master_copy(lkb))
2140                 send_grant(r, lkb);
2141         else
2142                 queue_cast(r, lkb, 0);
2143 }
2144
2145 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2146    change the granted/requested modes.  We're munging things accordingly in
2147    the process copy.
2148    CONVDEADLK: our grmode may have been forced down to NL to resolve a
2149    conversion deadlock
2150    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2151    compatible with other granted locks */
2152
2153 static void munge_demoted(struct dlm_lkb *lkb)
2154 {
2155         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2156                 log_print("munge_demoted %x invalid modes gr %d rq %d",
2157                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2158                 return;
2159         }
2160
2161         lkb->lkb_grmode = DLM_LOCK_NL;
2162 }
2163
2164 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
2165 {
2166         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
2167             ms->m_type != DLM_MSG_GRANT) {
2168                 log_print("munge_altmode %x invalid reply type %d",
2169                           lkb->lkb_id, ms->m_type);
2170                 return;
2171         }
2172
2173         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2174                 lkb->lkb_rqmode = DLM_LOCK_PR;
2175         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2176                 lkb->lkb_rqmode = DLM_LOCK_CW;
2177         else {
2178                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2179                 dlm_print_lkb(lkb);
2180         }
2181 }
2182
2183 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2184 {
2185         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2186                                            lkb_statequeue);
2187         if (lkb->lkb_id == first->lkb_id)
2188                 return 1;
2189
2190         return 0;
2191 }
2192
2193 /* Check if the given lkb conflicts with another lkb on the queue. */
2194
2195 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2196 {
2197         struct dlm_lkb *this;
2198
2199         list_for_each_entry(this, head, lkb_statequeue) {
2200                 if (this == lkb)
2201                         continue;
2202                 if (!modes_compat(this, lkb))
2203                         return 1;
2204         }
2205         return 0;
2206 }
2207
2208 /*
2209  * "A conversion deadlock arises with a pair of lock requests in the converting
2210  * queue for one resource.  The granted mode of each lock blocks the requested
2211  * mode of the other lock."
2212  *
2213  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2214  * convert queue from being granted, then deadlk/demote lkb.
2215  *
2216  * Example:
2217  * Granted Queue: empty
2218  * Convert Queue: NL->EX (first lock)
2219  *                PR->EX (second lock)
2220  *
2221  * The first lock can't be granted because of the granted mode of the second
2222  * lock and the second lock can't be granted because it's not first in the
2223  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2224  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2225  * flag set and return DEMOTED in the lksb flags.
2226  *
2227  * Originally, this function detected conv-deadlk in a more limited scope:
2228  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2229  * - if lkb1 was the first entry in the queue (not just earlier), and was
2230  *   blocked by the granted mode of lkb2, and there was nothing on the
2231  *   granted queue preventing lkb1 from being granted immediately, i.e.
2232  *   lkb2 was the only thing preventing lkb1 from being granted.
2233  *
2234  * That second condition meant we'd only say there was conv-deadlk if
2235  * resolving it (by demotion) would lead to the first lock on the convert
2236  * queue being granted right away.  It allowed conversion deadlocks to exist
2237  * between locks on the convert queue while they couldn't be granted anyway.
2238  *
2239  * Now, we detect and take action on conversion deadlocks immediately when
2240  * they're created, even if they may not be immediately consequential.  If
2241  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2242  * mode that would prevent lkb1's conversion from being granted, we do a
2243  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2244  * I think this means that the lkb_is_ahead condition below should always
2245  * be zero, i.e. there will never be conv-deadlk between two locks that are
2246  * both already on the convert queue.
2247  */
2248
2249 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2250 {
2251         struct dlm_lkb *lkb1;
2252         int lkb_is_ahead = 0;
2253
2254         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2255                 if (lkb1 == lkb2) {
2256                         lkb_is_ahead = 1;
2257                         continue;
2258                 }
2259
2260                 if (!lkb_is_ahead) {
2261                         if (!modes_compat(lkb2, lkb1))
2262                                 return 1;
2263                 } else {
2264                         if (!modes_compat(lkb2, lkb1) &&
2265                             !modes_compat(lkb1, lkb2))
2266                                 return 1;
2267                 }
2268         }
2269         return 0;
2270 }
2271
2272 /*
2273  * Return 1 if the lock can be granted, 0 otherwise.
2274  * Also detect and resolve conversion deadlocks.
2275  *
2276  * lkb is the lock to be granted
2277  *
2278  * now is 1 if the function is being called in the context of the
2279  * immediate request, it is 0 if called later, after the lock has been
2280  * queued.
2281  *
2282  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2283  * after recovery.
2284  *
2285  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2286  */
2287
2288 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2289                            int recover)
2290 {
2291         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2292
2293         /*
2294          * 6-10: Version 5.4 introduced an option to address the phenomenon of
2295          * a new request for a NL mode lock being blocked.
2296          *
2297          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2298          * request, then it would be granted.  In essence, the use of this flag
2299          * tells the Lock Manager to expedite theis request by not considering
2300          * what may be in the CONVERTING or WAITING queues...  As of this
2301          * writing, the EXPEDITE flag can be used only with new requests for NL
2302          * mode locks.  This flag is not valid for conversion requests.
2303          *
2304          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2305          * conversion or used with a non-NL requested mode.  We also know an
2306          * EXPEDITE request is always granted immediately, so now must always
2307          * be 1.  The full condition to grant an expedite request: (now &&
2308          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2309          * therefore be shortened to just checking the flag.
2310          */
2311
2312         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2313                 return 1;
2314
2315         /*
2316          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2317          * added to the remaining conditions.
2318          */
2319
2320         if (queue_conflict(&r->res_grantqueue, lkb))
2321                 return 0;
2322
2323         /*
2324          * 6-3: By default, a conversion request is immediately granted if the
2325          * requested mode is compatible with the modes of all other granted
2326          * locks
2327          */
2328
2329         if (queue_conflict(&r->res_convertqueue, lkb))
2330                 return 0;
2331
2332         /*
2333          * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2334          * locks for a recovered rsb, on which lkb's have been rebuilt.
2335          * The lkb's may have been rebuilt on the queues in a different
2336          * order than they were in on the previous master.  So, granting
2337          * queued conversions in order after recovery doesn't make sense
2338          * since the order hasn't been preserved anyway.  The new order
2339          * could also have created a new "in place" conversion deadlock.
2340          * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2341          * After recovery, there would be no granted locks, and possibly
2342          * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2343          * recovery, grant conversions without considering order.
2344          */
2345
2346         if (conv && recover)
2347                 return 1;
2348
2349         /*
2350          * 6-5: But the default algorithm for deciding whether to grant or
2351          * queue conversion requests does not by itself guarantee that such
2352          * requests are serviced on a "first come first serve" basis.  This, in
2353          * turn, can lead to a phenomenon known as "indefinate postponement".
2354          *
2355          * 6-7: This issue is dealt with by using the optional QUECVT flag with
2356          * the system service employed to request a lock conversion.  This flag
2357          * forces certain conversion requests to be queued, even if they are
2358          * compatible with the granted modes of other locks on the same
2359          * resource.  Thus, the use of this flag results in conversion requests
2360          * being ordered on a "first come first servce" basis.
2361          *
2362          * DCT: This condition is all about new conversions being able to occur
2363          * "in place" while the lock remains on the granted queue (assuming
2364          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2365          * doesn't _have_ to go onto the convert queue where it's processed in
2366          * order.  The "now" variable is necessary to distinguish converts
2367          * being received and processed for the first time now, because once a
2368          * convert is moved to the conversion queue the condition below applies
2369          * requiring fifo granting.
2370          */
2371
2372         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2373                 return 1;
2374
2375         /*
2376          * Even if the convert is compat with all granted locks,
2377          * QUECVT forces it behind other locks on the convert queue.
2378          */
2379
2380         if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2381                 if (list_empty(&r->res_convertqueue))
2382                         return 1;
2383                 else
2384                         return 0;
2385         }
2386
2387         /*
2388          * The NOORDER flag is set to avoid the standard vms rules on grant
2389          * order.
2390          */
2391
2392         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2393                 return 1;
2394
2395         /*
2396          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2397          * granted until all other conversion requests ahead of it are granted
2398          * and/or canceled.
2399          */
2400
2401         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2402                 return 1;
2403
2404         /*
2405          * 6-4: By default, a new request is immediately granted only if all
2406          * three of the following conditions are satisfied when the request is
2407          * issued:
2408          * - The queue of ungranted conversion requests for the resource is
2409          *   empty.
2410          * - The queue of ungranted new requests for the resource is empty.
2411          * - The mode of the new request is compatible with the most
2412          *   restrictive mode of all granted locks on the resource.
2413          */
2414
2415         if (now && !conv && list_empty(&r->res_convertqueue) &&
2416             list_empty(&r->res_waitqueue))
2417                 return 1;
2418
2419         /*
2420          * 6-4: Once a lock request is in the queue of ungranted new requests,
2421          * it cannot be granted until the queue of ungranted conversion
2422          * requests is empty, all ungranted new requests ahead of it are
2423          * granted and/or canceled, and it is compatible with the granted mode
2424          * of the most restrictive lock granted on the resource.
2425          */
2426
2427         if (!now && !conv && list_empty(&r->res_convertqueue) &&
2428             first_in_list(lkb, &r->res_waitqueue))
2429                 return 1;
2430
2431         return 0;
2432 }
2433
2434 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2435                           int recover, int *err)
2436 {
2437         int rv;
2438         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2439         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2440
2441         if (err)
2442                 *err = 0;
2443
2444         rv = _can_be_granted(r, lkb, now, recover);
2445         if (rv)
2446                 goto out;
2447
2448         /*
2449          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2450          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2451          * cancels one of the locks.
2452          */
2453
2454         if (is_convert && can_be_queued(lkb) &&
2455             conversion_deadlock_detect(r, lkb)) {
2456                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2457                         lkb->lkb_grmode = DLM_LOCK_NL;
2458                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
2459                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
2460                         if (err)
2461                                 *err = -EDEADLK;
2462                         else {
2463                                 log_print("can_be_granted deadlock %x now %d",
2464                                           lkb->lkb_id, now);
2465                                 dlm_dump_rsb(r);
2466                         }
2467                 }
2468                 goto out;
2469         }
2470
2471         /*
2472          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2473          * to grant a request in a mode other than the normal rqmode.  It's a
2474          * simple way to provide a big optimization to applications that can
2475          * use them.
2476          */
2477
2478         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2479                 alt = DLM_LOCK_PR;
2480         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2481                 alt = DLM_LOCK_CW;
2482
2483         if (alt) {
2484                 lkb->lkb_rqmode = alt;
2485                 rv = _can_be_granted(r, lkb, now, 0);
2486                 if (rv)
2487                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
2488                 else
2489                         lkb->lkb_rqmode = rqmode;
2490         }
2491  out:
2492         return rv;
2493 }
2494
2495 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
2496    for locks pending on the convert list.  Once verified (watch for these
2497    log_prints), we should be able to just call _can_be_granted() and not
2498    bother with the demote/deadlk cases here (and there's no easy way to deal
2499    with a deadlk here, we'd have to generate something like grant_lock with
2500    the deadlk error.) */
2501
2502 /* Returns the highest requested mode of all blocked conversions; sets
2503    cw if there's a blocked conversion to DLM_LOCK_CW. */
2504
2505 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2506                                  unsigned int *count)
2507 {
2508         struct dlm_lkb *lkb, *s;
2509         int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2510         int hi, demoted, quit, grant_restart, demote_restart;
2511         int deadlk;
2512
2513         quit = 0;
2514  restart:
2515         grant_restart = 0;
2516         demote_restart = 0;
2517         hi = DLM_LOCK_IV;
2518
2519         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2520                 demoted = is_demoted(lkb);
2521                 deadlk = 0;
2522
2523                 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2524                         grant_lock_pending(r, lkb);
2525                         grant_restart = 1;
2526                         if (count)
2527                                 (*count)++;
2528                         continue;
2529                 }
2530
2531                 if (!demoted && is_demoted(lkb)) {
2532                         log_print("WARN: pending demoted %x node %d %s",
2533                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2534                         demote_restart = 1;
2535                         continue;
2536                 }
2537
2538                 if (deadlk) {
2539                         log_print("WARN: pending deadlock %x node %d %s",
2540                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2541                         dlm_dump_rsb(r);
2542                         continue;
2543                 }
2544
2545                 hi = max_t(int, lkb->lkb_rqmode, hi);
2546
2547                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2548                         *cw = 1;
2549         }
2550
2551         if (grant_restart)
2552                 goto restart;
2553         if (demote_restart && !quit) {
2554                 quit = 1;
2555                 goto restart;
2556         }
2557
2558         return max_t(int, high, hi);
2559 }
2560
2561 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2562                               unsigned int *count)
2563 {
2564         struct dlm_lkb *lkb, *s;
2565
2566         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2567                 if (can_be_granted(r, lkb, 0, 0, NULL)) {
2568                         grant_lock_pending(r, lkb);
2569                         if (count)
2570                                 (*count)++;
2571                 } else {
2572                         high = max_t(int, lkb->lkb_rqmode, high);
2573                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
2574                                 *cw = 1;
2575                 }
2576         }
2577
2578         return high;
2579 }
2580
2581 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2582    on either the convert or waiting queue.
2583    high is the largest rqmode of all locks blocked on the convert or
2584    waiting queue. */
2585
2586 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2587 {
2588         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2589                 if (gr->lkb_highbast < DLM_LOCK_EX)
2590                         return 1;
2591                 return 0;
2592         }
2593
2594         if (gr->lkb_highbast < high &&
2595             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2596                 return 1;
2597         return 0;
2598 }
2599
2600 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2601 {
2602         struct dlm_lkb *lkb, *s;
2603         int high = DLM_LOCK_IV;
2604         int cw = 0;
2605
2606         if (!is_master(r)) {
2607                 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2608                 dlm_dump_rsb(r);
2609                 return;
2610         }
2611
2612         high = grant_pending_convert(r, high, &cw, count);
2613         high = grant_pending_wait(r, high, &cw, count);
2614
2615         if (high == DLM_LOCK_IV)
2616                 return;
2617
2618         /*
2619          * If there are locks left on the wait/convert queue then send blocking
2620          * ASTs to granted locks based on the largest requested mode (high)
2621          * found above.
2622          */
2623
2624         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2625                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2626                         if (cw && high == DLM_LOCK_PR &&
2627                             lkb->lkb_grmode == DLM_LOCK_PR)
2628                                 queue_bast(r, lkb, DLM_LOCK_CW);
2629                         else
2630                                 queue_bast(r, lkb, high);
2631                         lkb->lkb_highbast = high;
2632                 }
2633         }
2634 }
2635
2636 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2637 {
2638         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2639             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2640                 if (gr->lkb_highbast < DLM_LOCK_EX)
2641                         return 1;
2642                 return 0;
2643         }
2644
2645         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2646                 return 1;
2647         return 0;
2648 }
2649
2650 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2651                             struct dlm_lkb *lkb)
2652 {
2653         struct dlm_lkb *gr;
2654
2655         list_for_each_entry(gr, head, lkb_statequeue) {
2656                 /* skip self when sending basts to convertqueue */
2657                 if (gr == lkb)
2658                         continue;
2659                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2660                         queue_bast(r, gr, lkb->lkb_rqmode);
2661                         gr->lkb_highbast = lkb->lkb_rqmode;
2662                 }
2663         }
2664 }
2665
2666 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2667 {
2668         send_bast_queue(r, &r->res_grantqueue, lkb);
2669 }
2670
2671 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2672 {
2673         send_bast_queue(r, &r->res_grantqueue, lkb);
2674         send_bast_queue(r, &r->res_convertqueue, lkb);
2675 }
2676
2677 /* set_master(r, lkb) -- set the master nodeid of a resource
2678
2679    The purpose of this function is to set the nodeid field in the given
2680    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2681    known, it can just be copied to the lkb and the function will return
2682    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2683    before it can be copied to the lkb.
2684
2685    When the rsb nodeid is being looked up remotely, the initial lkb
2686    causing the lookup is kept on the ls_waiters list waiting for the
2687    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2688    on the rsb's res_lookup list until the master is verified.
2689
2690    Return values:
2691    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2692    1: the rsb master is not available and the lkb has been placed on
2693       a wait queue
2694 */
2695
2696 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2697 {
2698         int our_nodeid = dlm_our_nodeid();
2699
2700         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2701                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2702                 r->res_first_lkid = lkb->lkb_id;
2703                 lkb->lkb_nodeid = r->res_nodeid;
2704                 return 0;
2705         }
2706
2707         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2708                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2709                 return 1;
2710         }
2711
2712         if (r->res_master_nodeid == our_nodeid) {
2713                 lkb->lkb_nodeid = 0;
2714                 return 0;
2715         }
2716
2717         if (r->res_master_nodeid) {
2718                 lkb->lkb_nodeid = r->res_master_nodeid;
2719                 return 0;
2720         }
2721
2722         if (dlm_dir_nodeid(r) == our_nodeid) {
2723                 /* This is a somewhat unusual case; find_rsb will usually
2724                    have set res_master_nodeid when dir nodeid is local, but
2725                    there are cases where we become the dir node after we've
2726                    past find_rsb and go through _request_lock again.
2727                    confirm_master() or process_lookup_list() needs to be
2728                    called after this. */
2729                 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2730                           lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2731                           r->res_name);
2732                 r->res_master_nodeid = our_nodeid;
2733                 r->res_nodeid = 0;
2734                 lkb->lkb_nodeid = 0;
2735                 return 0;
2736         }
2737
2738         wait_pending_remove(r);
2739
2740         r->res_first_lkid = lkb->lkb_id;
2741         send_lookup(r, lkb);
2742         return 1;
2743 }
2744
2745 static void process_lookup_list(struct dlm_rsb *r)
2746 {
2747         struct dlm_lkb *lkb, *safe;
2748
2749         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2750                 list_del_init(&lkb->lkb_rsb_lookup);
2751                 _request_lock(r, lkb);
2752                 schedule();
2753         }
2754 }
2755
2756 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2757
2758 static void confirm_master(struct dlm_rsb *r, int error)
2759 {
2760         struct dlm_lkb *lkb;
2761
2762         if (!r->res_first_lkid)
2763                 return;
2764
2765         switch (error) {
2766         case 0:
2767         case -EINPROGRESS:
2768                 r->res_first_lkid = 0;
2769                 process_lookup_list(r);
2770                 break;
2771
2772         case -EAGAIN:
2773         case -EBADR:
2774         case -ENOTBLK:
2775                 /* the remote request failed and won't be retried (it was
2776                    a NOQUEUE, or has been canceled/unlocked); make a waiting
2777                    lkb the first_lkid */
2778
2779                 r->res_first_lkid = 0;
2780
2781                 if (!list_empty(&r->res_lookup)) {
2782                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2783                                          lkb_rsb_lookup);
2784                         list_del_init(&lkb->lkb_rsb_lookup);
2785                         r->res_first_lkid = lkb->lkb_id;
2786                         _request_lock(r, lkb);
2787                 }
2788                 break;
2789
2790         default:
2791                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2792         }
2793 }
2794
2795 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2796                          int namelen, unsigned long timeout_cs,
2797                          void (*ast) (void *astparam),
2798                          void *astparam,
2799                          void (*bast) (void *astparam, int mode),
2800                          struct dlm_args *args)
2801 {
2802         int rv = -EINVAL;
2803
2804         /* check for invalid arg usage */
2805
2806         if (mode < 0 || mode > DLM_LOCK_EX)
2807                 goto out;
2808
2809         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2810                 goto out;
2811
2812         if (flags & DLM_LKF_CANCEL)
2813                 goto out;
2814
2815         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2816                 goto out;
2817
2818         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2819                 goto out;
2820
2821         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2822                 goto out;
2823
2824         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2825                 goto out;
2826
2827         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2828                 goto out;
2829
2830         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2831                 goto out;
2832
2833         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2834                 goto out;
2835
2836         if (!ast || !lksb)
2837                 goto out;
2838
2839         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2840                 goto out;
2841
2842         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2843                 goto out;
2844
2845         /* these args will be copied to the lkb in validate_lock_args,
2846            it cannot be done now because when converting locks, fields in
2847            an active lkb cannot be modified before locking the rsb */
2848
2849         args->flags = flags;
2850         args->astfn = ast;
2851         args->astparam = astparam;
2852         args->bastfn = bast;
2853         args->timeout = timeout_cs;
2854         args->mode = mode;
2855         args->lksb = lksb;
2856         rv = 0;
2857  out:
2858         return rv;
2859 }
2860
2861 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2862 {
2863         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2864                       DLM_LKF_FORCEUNLOCK))
2865                 return -EINVAL;
2866
2867         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2868                 return -EINVAL;
2869
2870         args->flags = flags;
2871         args->astparam = astarg;
2872         return 0;
2873 }
2874
2875 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2876                               struct dlm_args *args)
2877 {
2878         int rv = -EINVAL;
2879
2880         if (args->flags & DLM_LKF_CONVERT) {
2881                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2882                         goto out;
2883
2884                 if (args->flags & DLM_LKF_QUECVT &&
2885                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2886                         goto out;
2887
2888                 rv = -EBUSY;
2889                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2890                         goto out;
2891
2892                 if (lkb->lkb_wait_type)
2893                         goto out;
2894
2895                 if (is_overlap(lkb))
2896                         goto out;
2897         }
2898
2899         lkb->lkb_exflags = args->flags;
2900         lkb->lkb_sbflags = 0;
2901         lkb->lkb_astfn = args->astfn;
2902         lkb->lkb_astparam = args->astparam;
2903         lkb->lkb_bastfn = args->bastfn;
2904         lkb->lkb_rqmode = args->mode;
2905         lkb->lkb_lksb = args->lksb;
2906         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2907         lkb->lkb_ownpid = (int) current->pid;
2908         lkb->lkb_timeout_cs = args->timeout;
2909         rv = 0;
2910  out:
2911         if (rv)
2912                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2913                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2914                           lkb->lkb_status, lkb->lkb_wait_type,
2915                           lkb->lkb_resource->res_name);
2916         return rv;
2917 }
2918
2919 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2920    for success */
2921
2922 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2923    because there may be a lookup in progress and it's valid to do
2924    cancel/unlockf on it */
2925
2926 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2927 {
2928         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2929         int rv = -EINVAL;
2930
2931         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2932                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2933                 dlm_print_lkb(lkb);
2934                 goto out;
2935         }
2936
2937         /* an lkb may still exist even though the lock is EOL'ed due to a
2938            cancel, unlock or failed noqueue request; an app can't use these
2939            locks; return same error as if the lkid had not been found at all */
2940
2941         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2942                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2943                 rv = -ENOENT;
2944                 goto out;
2945         }
2946
2947         /* an lkb may be waiting for an rsb lookup to complete where the
2948            lookup was initiated by another lock */
2949
2950         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2951                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2952                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2953                         list_del_init(&lkb->lkb_rsb_lookup);
2954                         queue_cast(lkb->lkb_resource, lkb,
2955                                    args->flags & DLM_LKF_CANCEL ?
2956                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2957                         unhold_lkb(lkb); /* undoes create_lkb() */
2958                 }
2959                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2960                 rv = -EBUSY;
2961                 goto out;
2962         }
2963
2964         /* cancel not allowed with another cancel/unlock in progress */
2965
2966         if (args->flags & DLM_LKF_CANCEL) {
2967                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2968                         goto out;
2969
2970                 if (is_overlap(lkb))
2971                         goto out;
2972
2973                 /* don't let scand try to do a cancel */
2974                 del_timeout(lkb);
2975
2976                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2977                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2978                         rv = -EBUSY;
2979                         goto out;
2980                 }
2981
2982                 /* there's nothing to cancel */
2983                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2984                     !lkb->lkb_wait_type) {
2985                         rv = -EBUSY;
2986                         goto out;
2987                 }
2988
2989                 switch (lkb->lkb_wait_type) {
2990                 case DLM_MSG_LOOKUP:
2991                 case DLM_MSG_REQUEST:
2992                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2993                         rv = -EBUSY;
2994                         goto out;
2995                 case DLM_MSG_UNLOCK:
2996                 case DLM_MSG_CANCEL:
2997                         goto out;
2998                 }
2999                 /* add_to_waiters() will set OVERLAP_CANCEL */
3000                 goto out_ok;
3001         }
3002
3003         /* do we need to allow a force-unlock if there's a normal unlock
3004            already in progress?  in what conditions could the normal unlock
3005            fail such that we'd want to send a force-unlock to be sure? */
3006
3007         if (args->flags & DLM_LKF_FORCEUNLOCK) {
3008                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
3009                         goto out;
3010
3011                 if (is_overlap_unlock(lkb))
3012                         goto out;
3013
3014                 /* don't let scand try to do a cancel */
3015                 del_timeout(lkb);
3016
3017                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3018                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3019                         rv = -EBUSY;
3020                         goto out;
3021                 }
3022
3023                 switch (lkb->lkb_wait_type) {
3024                 case DLM_MSG_LOOKUP:
3025                 case DLM_MSG_REQUEST:
3026                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3027                         rv = -EBUSY;
3028                         goto out;
3029                 case DLM_MSG_UNLOCK:
3030                         goto out;
3031                 }
3032                 /* add_to_waiters() will set OVERLAP_UNLOCK */
3033                 goto out_ok;
3034         }
3035
3036         /* normal unlock not allowed if there's any op in progress */
3037         rv = -EBUSY;
3038         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
3039                 goto out;
3040
3041  out_ok:
3042         /* an overlapping op shouldn't blow away exflags from other op */
3043         lkb->lkb_exflags |= args->flags;
3044         lkb->lkb_sbflags = 0;
3045         lkb->lkb_astparam = args->astparam;
3046         rv = 0;
3047  out:
3048         if (rv)
3049                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
3050                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
3051                           args->flags, lkb->lkb_wait_type,
3052                           lkb->lkb_resource->res_name);
3053         return rv;
3054 }
3055
3056 /*
3057  * Four stage 4 varieties:
3058  * do_request(), do_convert(), do_unlock(), do_cancel()
3059  * These are called on the master node for the given lock and
3060  * from the central locking logic.
3061  */
3062
3063 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3064 {
3065         int error = 0;
3066
3067         if (can_be_granted(r, lkb, 1, 0, NULL)) {
3068                 grant_lock(r, lkb);
3069                 queue_cast(r, lkb, 0);
3070                 goto out;
3071         }
3072
3073         if (can_be_queued(lkb)) {
3074                 error = -EINPROGRESS;
3075                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3076                 add_timeout(lkb);
3077                 goto out;
3078         }
3079
3080         error = -EAGAIN;
3081         queue_cast(r, lkb, -EAGAIN);
3082  out:
3083         return error;
3084 }
3085
3086 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3087                                int error)
3088 {
3089         switch (error) {
3090         case -EAGAIN:
3091                 if (force_blocking_asts(lkb))
3092                         send_blocking_asts_all(r, lkb);
3093                 break;
3094         case -EINPROGRESS:
3095                 send_blocking_asts(r, lkb);
3096                 break;
3097         }
3098 }
3099
3100 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3101 {
3102         int error = 0;
3103         int deadlk = 0;
3104
3105         /* changing an existing lock may allow others to be granted */
3106
3107         if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3108                 grant_lock(r, lkb);
3109                 queue_cast(r, lkb, 0);
3110                 goto out;
3111         }
3112
3113         /* can_be_granted() detected that this lock would block in a conversion
3114            deadlock, so we leave it on the granted queue and return EDEADLK in
3115            the ast for the convert. */
3116
3117         if (deadlk) {
3118                 /* it's left on the granted queue */
3119                 revert_lock(r, lkb);
3120                 queue_cast(r, lkb, -EDEADLK);
3121                 error = -EDEADLK;
3122                 goto out;
3123         }
3124
3125         /* is_demoted() means the can_be_granted() above set the grmode
3126            to NL, and left us on the granted queue.  This auto-demotion
3127            (due to CONVDEADLK) might mean other locks, and/or this lock, are
3128            now grantable.  We have to try to grant other converting locks
3129            before we try again to grant this one. */
3130
3131         if (is_demoted(lkb)) {
3132                 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3133                 if (_can_be_granted(r, lkb, 1, 0)) {
3134                         grant_lock(r, lkb);
3135                         queue_cast(r, lkb, 0);
3136                         goto out;
3137                 }
3138                 /* else fall through and move to convert queue */
3139         }
3140
3141         if (can_be_queued(lkb)) {
3142                 error = -EINPROGRESS;
3143                 del_lkb(r, lkb);
3144                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3145                 add_timeout(lkb);
3146                 goto out;
3147         }
3148
3149         error = -EAGAIN;
3150         queue_cast(r, lkb, -EAGAIN);
3151  out:
3152         return error;
3153 }
3154
3155 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3156                                int error)
3157 {
3158         switch (error) {
3159         case 0:
3160                 grant_pending_locks(r, NULL);
3161                 /* grant_pending_locks also sends basts */
3162                 break;
3163         case -EAGAIN:
3164                 if (force_blocking_asts(lkb))
3165                         send_blocking_asts_all(r, lkb);
3166                 break;
3167         case -EINPROGRESS:
3168                 send_blocking_asts(r, lkb);
3169                 break;
3170         }
3171 }
3172
3173 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3174 {
3175         remove_lock(r, lkb);
3176         queue_cast(r, lkb, -DLM_EUNLOCK);
3177         return -DLM_EUNLOCK;
3178 }
3179
3180 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3181                               int error)
3182 {
3183         grant_pending_locks(r, NULL);
3184 }
3185
3186 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3187
3188 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3189 {
3190         int error;
3191
3192         error = revert_lock(r, lkb);
3193         if (error) {
3194                 queue_cast(r, lkb, -DLM_ECANCEL);
3195                 return -DLM_ECANCEL;
3196         }
3197         return 0;
3198 }
3199
3200 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3201                               int error)
3202 {
3203         if (error)
3204                 grant_pending_locks(r, NULL);
3205 }
3206
3207 /*
3208  * Four stage 3 varieties:
3209  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3210  */
3211
3212 /* add a new lkb to a possibly new rsb, called by requesting process */
3213
3214 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3215 {
3216         int error;
3217
3218         /* set_master: sets lkb nodeid from r */
3219
3220         error = set_master(r, lkb);
3221         if (error < 0)
3222                 goto out;
3223         if (error) {
3224                 error = 0;
3225                 goto out;
3226         }
3227
3228         if (is_remote(r)) {
3229                 /* receive_request() calls do_request() on remote node */
3230                 error = send_request(r, lkb);
3231         } else {
3232                 error = do_request(r, lkb);
3233                 /* for remote locks the request_reply is sent
3234                    between do_request and do_request_effects */
3235                 do_request_effects(r, lkb, error);
3236         }
3237  out:
3238         return error;
3239 }
3240
3241 /* change some property of an existing lkb, e.g. mode */
3242
3243 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3244 {
3245         int error;
3246
3247         if (is_remote(r)) {
3248                 /* receive_convert() calls do_convert() on remote node */
3249                 error = send_convert(r, lkb);
3250         } else {
3251                 error = do_convert(r, lkb);
3252                 /* for remote locks the convert_reply is sent
3253                    between do_convert and do_convert_effects */
3254                 do_convert_effects(r, lkb, error);
3255         }
3256
3257         return error;
3258 }
3259
3260 /* remove an existing lkb from the granted queue */
3261
3262 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3263 {
3264         int error;
3265
3266         if (is_remote(r)) {
3267                 /* receive_unlock() calls do_unlock() on remote node */
3268                 error = send_unlock(r, lkb);
3269         } else {
3270                 error = do_unlock(r, lkb);
3271                 /* for remote locks the unlock_reply is sent
3272                    between do_unlock and do_unlock_effects */
3273                 do_unlock_effects(r, lkb, error);
3274         }
3275
3276         return error;
3277 }
3278
3279 /* remove an existing lkb from the convert or wait queue */
3280
3281 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3282 {
3283         int error;
3284
3285         if (is_remote(r)) {
3286                 /* receive_cancel() calls do_cancel() on remote node */
3287                 error = send_cancel(r, lkb);
3288         } else {
3289                 error = do_cancel(r, lkb);
3290                 /* for remote locks the cancel_reply is sent
3291                    between do_cancel and do_cancel_effects */
3292                 do_cancel_effects(r, lkb, error);
3293         }
3294
3295         return error;
3296 }
3297
3298 /*
3299  * Four stage 2 varieties:
3300  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3301  */
3302
3303 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
3304                         int len, struct dlm_args *args)
3305 {
3306         struct dlm_rsb *r;
3307         int error;
3308
3309         error = validate_lock_args(ls, lkb, args);
3310         if (error)
3311                 return error;
3312
3313         error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3314         if (error)
3315                 return error;
3316
3317         lock_rsb(r);
3318
3319         attach_lkb(r, lkb);
3320         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3321
3322         error = _request_lock(r, lkb);
3323
3324         unlock_rsb(r);
3325         put_rsb(r);
3326         return error;
3327 }
3328
3329 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3330                         struct dlm_args *args)
3331 {
3332         struct dlm_rsb *r;
3333         int error;
3334
3335         r = lkb->lkb_resource;
3336
3337         hold_rsb(r);
3338         lock_rsb(r);
3339
3340         error = validate_lock_args(ls, lkb, args);
3341         if (error)
3342                 goto out;
3343
3344         error = _convert_lock(r, lkb);
3345  out:
3346         unlock_rsb(r);
3347         put_rsb(r);
3348         return error;
3349 }
3350
3351 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3352                        struct dlm_args *args)
3353 {
3354         struct dlm_rsb *r;
3355         int error;
3356
3357         r = lkb->lkb_resource;
3358
3359         hold_rsb(r);
3360         lock_rsb(r);
3361
3362         error = validate_unlock_args(lkb, args);
3363         if (error)
3364                 goto out;
3365
3366         error = _unlock_lock(r, lkb);
3367  out:
3368         unlock_rsb(r);
3369         put_rsb(r);
3370         return error;
3371 }
3372
3373 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3374                        struct dlm_args *args)
3375 {
3376         struct dlm_rsb *r;
3377         int error;
3378
3379         r = lkb->lkb_resource;
3380
3381         hold_rsb(r);
3382         lock_rsb(r);
3383
3384         error = validate_unlock_args(lkb, args);
3385         if (error)
3386                 goto out;
3387
3388         error = _cancel_lock(r, lkb);
3389  out:
3390         unlock_rsb(r);
3391         put_rsb(r);
3392         return error;
3393 }
3394
3395 /*
3396  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3397  */
3398
3399 int dlm_lock(dlm_lockspace_t *lockspace,
3400              int mode,
3401              struct dlm_lksb *lksb,
3402              uint32_t flags,
3403              void *name,
3404              unsigned int namelen,
3405              uint32_t parent_lkid,
3406              void (*ast) (void *astarg),
3407              void *astarg,
3408              void (*bast) (void *astarg, int mode))
3409 {
3410         struct dlm_ls *ls;
3411         struct dlm_lkb *lkb;
3412         struct dlm_args args;
3413         int error, convert = flags & DLM_LKF_CONVERT;
3414
3415         ls = dlm_find_lockspace_local(lockspace);
3416         if (!ls)
3417                 return -EINVAL;
3418
3419         dlm_lock_recovery(ls);
3420
3421         if (convert)
3422                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3423         else
3424                 error = create_lkb(ls, &lkb);
3425
3426         if (error)
3427                 goto out;
3428
3429         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3430                               astarg, bast, &args);
3431         if (error)
3432                 goto out_put;
3433
3434         if (convert)
3435                 error = convert_lock(ls, lkb, &args);
3436         else
3437                 error = request_lock(ls, lkb, name, namelen, &args);
3438
3439         if (error == -EINPROGRESS)
3440                 error = 0;
3441  out_put:
3442         if (convert || error)
3443                 __put_lkb(ls, lkb);
3444         if (error == -EAGAIN || error == -EDEADLK)
3445                 error = 0;
3446  out:
3447         dlm_unlock_recovery(ls);
3448         dlm_put_lockspace(ls);
3449         return error;
3450 }
3451
3452 int dlm_unlock(dlm_lockspace_t *lockspace,
3453                uint32_t lkid,
3454                uint32_t flags,
3455                struct dlm_lksb *lksb,
3456                void *astarg)
3457 {
3458         struct dlm_ls *ls;
3459         struct dlm_lkb *lkb;
3460         struct dlm_args args;
3461         int error;
3462
3463         ls = dlm_find_lockspace_local(lockspace);
3464         if (!ls)
3465                 return -EINVAL;
3466
3467         dlm_lock_recovery(ls);
3468
3469         error = find_lkb(ls, lkid, &lkb);
3470         if (error)
3471                 goto out;
3472
3473         error = set_unlock_args(flags, astarg, &args);
3474         if (error)
3475                 goto out_put;
3476
3477         if (flags & DLM_LKF_CANCEL)
3478                 error = cancel_lock(ls, lkb, &args);
3479         else
3480                 error = unlock_lock(ls, lkb, &args);
3481
3482         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3483                 error = 0;
3484         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3485                 error = 0;
3486  out_put:
3487         dlm_put_lkb(lkb);
3488  out:
3489         dlm_unlock_recovery(ls);
3490         dlm_put_lockspace(ls);
3491         return error;
3492 }
3493
3494 /*
3495  * send/receive routines for remote operations and replies
3496  *
3497  * send_args
3498  * send_common
3499  * send_request                 receive_request
3500  * send_convert                 receive_convert
3501  * send_unlock                  receive_unlock
3502  * send_cancel                  receive_cancel
3503  * send_grant                   receive_grant
3504  * send_bast                    receive_bast
3505  * send_lookup                  receive_lookup
3506  * send_remove                  receive_remove
3507  *
3508  *                              send_common_reply
3509  * receive_request_reply        send_request_reply
3510  * receive_convert_reply        send_convert_reply
3511  * receive_unlock_reply         send_unlock_reply
3512  * receive_cancel_reply         send_cancel_reply
3513  * receive_lookup_reply         send_lookup_reply
3514  */
3515
3516 static int _create_message(struct dlm_ls *ls, int mb_len,
3517                            int to_nodeid, int mstype,
3518                            struct dlm_message **ms_ret,
3519                            struct dlm_mhandle **mh_ret)
3520 {
3521         struct dlm_message *ms;
3522         struct dlm_mhandle *mh;
3523         char *mb;
3524
3525         /* get_buffer gives us a message handle (mh) that we need to
3526            pass into lowcomms_commit and a message buffer (mb) that we
3527            write our data into */
3528
3529         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
3530         if (!mh)
3531                 return -ENOBUFS;
3532
3533         memset(mb, 0, mb_len);
3534
3535         ms = (struct dlm_message *) mb;
3536
3537         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3538         ms->m_header.h_lockspace = ls->ls_global_id;
3539         ms->m_header.h_nodeid = dlm_our_nodeid();
3540         ms->m_header.h_length = mb_len;
3541         ms->m_header.h_cmd = DLM_MSG;
3542
3543         ms->m_type = mstype;
3544
3545         *mh_ret = mh;
3546         *ms_ret = ms;
3547         return 0;
3548 }
3549
3550 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3551                           int to_nodeid, int mstype,
3552                           struct dlm_message **ms_ret,
3553                           struct dlm_mhandle **mh_ret)
3554 {
3555         int mb_len = sizeof(struct dlm_message);
3556
3557         switch (mstype) {
3558         case DLM_MSG_REQUEST:
3559         case DLM_MSG_LOOKUP:
3560         case DLM_MSG_REMOVE:
3561                 mb_len += r->res_length;
3562                 break;
3563         case DLM_MSG_CONVERT:
3564         case DLM_MSG_UNLOCK:
3565         case DLM_MSG_REQUEST_REPLY:
3566         case DLM_MSG_CONVERT_REPLY:
3567         case DLM_MSG_GRANT:
3568                 if (lkb && lkb->lkb_lvbptr)
3569                         mb_len += r->res_ls->ls_lvblen;
3570                 break;
3571         }
3572
3573         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3574                                ms_ret, mh_ret);
3575 }
3576
3577 /* further lowcomms enhancements or alternate implementations may make
3578    the return value from this function useful at some point */
3579
3580 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
3581 {
3582         dlm_message_out(ms);
3583         dlm_lowcomms_commit_buffer(mh);
3584         return 0;
3585 }
3586
3587 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3588                       struct dlm_message *ms)
3589 {
3590         ms->m_nodeid   = lkb->lkb_nodeid;
3591         ms->m_pid      = lkb->lkb_ownpid;
3592         ms->m_lkid     = lkb->lkb_id;
3593         ms->m_remid    = lkb->lkb_remid;
3594         ms->m_exflags  = lkb->lkb_exflags;
3595         ms->m_sbflags  = lkb->lkb_sbflags;
3596         ms->m_flags    = lkb->lkb_flags;
3597         ms->m_lvbseq   = lkb->lkb_lvbseq;
3598         ms->m_status   = lkb->lkb_status;
3599         ms->m_grmode   = lkb->lkb_grmode;
3600         ms->m_rqmode   = lkb->lkb_rqmode;
3601         ms->m_hash     = r->res_hash;
3602
3603         /* m_result and m_bastmode are set from function args,
3604            not from lkb fields */
3605
3606         if (lkb->lkb_bastfn)
3607                 ms->m_asts |= DLM_CB_BAST;
3608         if (lkb->lkb_astfn)
3609                 ms->m_asts |= DLM_CB_CAST;
3610
3611         /* compare with switch in create_message; send_remove() doesn't
3612            use send_args() */
3613
3614         switch (ms->m_type) {
3615         case DLM_MSG_REQUEST:
3616         case DLM_MSG_LOOKUP:
3617                 memcpy(ms->m_extra, r->res_name, r->res_length);
3618                 break;
3619         case DLM_MSG_CONVERT:
3620         case DLM_MSG_UNLOCK:
3621         case DLM_MSG_REQUEST_REPLY:
3622         case DLM_MSG_CONVERT_REPLY:
3623         case DLM_MSG_GRANT:
3624                 if (!lkb->lkb_lvbptr)
3625                         break;
3626                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3627                 break;
3628         }
3629 }
3630
3631 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3632 {
3633         struct dlm_message *ms;
3634         struct dlm_mhandle *mh;
3635         int to_nodeid, error;
3636
3637         to_nodeid = r->res_nodeid;
3638
3639         error = add_to_waiters(lkb, mstype, to_nodeid);
3640         if (error)
3641                 return error;
3642
3643         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3644         if (error)
3645                 goto fail;
3646
3647         send_args(r, lkb, ms);
3648
3649         error = send_message(mh, ms);
3650         if (error)
3651                 goto fail;
3652         return 0;
3653
3654  fail:
3655         remove_from_waiters(lkb, msg_reply_type(mstype));
3656         return error;
3657 }
3658
3659 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3660 {
3661         return send_common(r, lkb, DLM_MSG_REQUEST);
3662 }
3663
3664 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3665 {
3666         int error;
3667
3668         error = send_common(r, lkb, DLM_MSG_CONVERT);
3669
3670         /* down conversions go without a reply from the master */
3671         if (!error && down_conversion(lkb)) {
3672                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3673                 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
3674                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3675                 r->res_ls->ls_stub_ms.m_result = 0;
3676                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3677         }
3678
3679         return error;
3680 }
3681
3682 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3683    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3684    that the master is still correct. */
3685
3686 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3687 {
3688         return send_common(r, lkb, DLM_MSG_UNLOCK);
3689 }
3690
3691 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3692 {
3693         return send_common(r, lkb, DLM_MSG_CANCEL);
3694 }
3695
3696 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3697 {
3698         struct dlm_message *ms;
3699         struct dlm_mhandle *mh;
3700         int to_nodeid, error;
3701
3702         to_nodeid = lkb->lkb_nodeid;
3703
3704         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3705         if (error)
3706                 goto out;
3707
3708         send_args(r, lkb, ms);
3709
3710         ms->m_result = 0;
3711
3712         error = send_message(mh, ms);
3713  out:
3714         return error;
3715 }
3716
3717 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3718 {
3719         struct dlm_message *ms;
3720         struct dlm_mhandle *mh;
3721         int to_nodeid, error;
3722
3723         to_nodeid = lkb->lkb_nodeid;
3724
3725         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3726         if (error)
3727                 goto out;
3728
3729         send_args(r, lkb, ms);
3730
3731         ms->m_bastmode = mode;
3732
3733         error = send_message(mh, ms);
3734  out:
3735         return error;
3736 }
3737
3738 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3739 {
3740         struct dlm_message *ms;
3741         struct dlm_mhandle *mh;
3742         int to_nodeid, error;
3743
3744         to_nodeid = dlm_dir_nodeid(r);
3745
3746         error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3747         if (error)
3748                 return error;
3749
3750         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3751         if (error)
3752                 goto fail;
3753
3754         send_args(r, lkb, ms);
3755
3756         error = send_message(mh, ms);
3757         if (error)
3758                 goto fail;
3759         return 0;
3760
3761  fail:
3762         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3763         return error;
3764 }
3765
3766 static int send_remove(struct dlm_rsb *r)
3767 {
3768         struct dlm_message *ms;
3769         struct dlm_mhandle *mh;
3770         int to_nodeid, error;
3771
3772         to_nodeid = dlm_dir_nodeid(r);
3773
3774         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3775         if (error)
3776                 goto out;
3777
3778         memcpy(ms->m_extra, r->res_name, r->res_length);
3779         ms->m_hash = r->res_hash;
3780
3781         error = send_message(mh, ms);
3782  out:
3783         return error;
3784 }
3785
3786 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3787                              int mstype, int rv)
3788 {
3789         struct dlm_message *ms;
3790         struct dlm_mhandle *mh;
3791         int to_nodeid, error;
3792
3793         to_nodeid = lkb->lkb_nodeid;
3794
3795         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3796         if (error)
3797                 goto out;
3798
3799         send_args(r, lkb, ms);
3800
3801         ms->m_result = rv;
3802
3803         error = send_message(mh, ms);
3804  out:
3805         return error;
3806 }
3807
3808 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3809 {
3810         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3811 }
3812
3813 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3814 {
3815         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3816 }
3817
3818 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3819 {
3820         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3821 }
3822
3823 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3824 {
3825         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3826 }
3827
3828 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3829                              int ret_nodeid, int rv)
3830 {
3831         struct dlm_rsb *r = &ls->ls_stub_rsb;
3832         struct dlm_message *ms;
3833         struct dlm_mhandle *mh;
3834         int error, nodeid = ms_in->m_header.h_nodeid;
3835
3836         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3837         if (error)
3838                 goto out;
3839
3840         ms->m_lkid = ms_in->m_lkid;
3841         ms->m_result = rv;
3842         ms->m_nodeid = ret_nodeid;
3843
3844         error = send_message(mh, ms);
3845  out:
3846         return error;
3847 }
3848
3849 /* which args we save from a received message depends heavily on the type
3850    of message, unlike the send side where we can safely send everything about
3851    the lkb for any type of message */
3852
3853 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3854 {
3855         lkb->lkb_exflags = ms->m_exflags;
3856         lkb->lkb_sbflags = ms->m_sbflags;
3857         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3858                          (ms->m_flags & 0x0000FFFF);
3859 }
3860
3861 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3862 {
3863         if (ms->m_flags == DLM_IFL_STUB_MS)
3864                 return;
3865
3866         lkb->lkb_sbflags = ms->m_sbflags;
3867         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3868                          (ms->m_flags & 0x0000FFFF);
3869 }
3870
3871 static int receive_extralen(struct dlm_message *ms)
3872 {
3873         return (ms->m_header.h_length - sizeof(struct dlm_message));
3874 }
3875
3876 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3877                        struct dlm_message *ms)
3878 {
3879         int len;
3880
3881         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3882                 if (!lkb->lkb_lvbptr)
3883                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3884                 if (!lkb->lkb_lvbptr)
3885                         return -ENOMEM;
3886                 len = receive_extralen(ms);
3887                 if (len > DLM_RESNAME_MAXLEN)
3888                         len = DLM_RESNAME_MAXLEN;
3889                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3890         }
3891         return 0;
3892 }
3893
3894 static void fake_bastfn(void *astparam, int mode)
3895 {
3896         log_print("fake_bastfn should not be called");
3897 }
3898
3899 static void fake_astfn(void *astparam)
3900 {
3901         log_print("fake_astfn should not be called");
3902 }
3903
3904 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3905                                 struct dlm_message *ms)
3906 {
3907         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3908         lkb->lkb_ownpid = ms->m_pid;
3909         lkb->lkb_remid = ms->m_lkid;
3910         lkb->lkb_grmode = DLM_LOCK_IV;
3911         lkb->lkb_rqmode = ms->m_rqmode;
3912
3913         lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3914         lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3915
3916         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3917                 /* lkb was just created so there won't be an lvb yet */
3918                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3919                 if (!lkb->lkb_lvbptr)
3920                         return -ENOMEM;
3921         }
3922
3923         return 0;
3924 }
3925
3926 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3927                                 struct dlm_message *ms)
3928 {
3929         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3930                 return -EBUSY;
3931
3932         if (receive_lvb(ls, lkb, ms))
3933                 return -ENOMEM;
3934
3935         lkb->lkb_rqmode = ms->m_rqmode;
3936         lkb->lkb_lvbseq = ms->m_lvbseq;
3937
3938         return 0;
3939 }
3940
3941 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3942                                struct dlm_message *ms)
3943 {
3944         if (receive_lvb(ls, lkb, ms))
3945                 return -ENOMEM;
3946         return 0;
3947 }
3948
3949 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3950    uses to send a reply and that the remote end uses to process the reply. */
3951
3952 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3953 {
3954         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3955         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3956         lkb->lkb_remid = ms->m_lkid;
3957 }
3958
3959 /* This is called after the rsb is locked so that we can safely inspect
3960    fields in the lkb. */
3961
3962 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3963 {
3964         int from = ms->m_header.h_nodeid;
3965         int error = 0;
3966
3967         switch (ms->m_type) {
3968         case DLM_MSG_CONVERT:
3969         case DLM_MSG_UNLOCK:
3970         case DLM_MSG_CANCEL:
3971                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3972                         error = -EINVAL;
3973                 break;
3974
3975         case DLM_MSG_CONVERT_REPLY:
3976         case DLM_MSG_UNLOCK_REPLY:
3977         case DLM_MSG_CANCEL_REPLY:
3978         case DLM_MSG_GRANT:
3979         case DLM_MSG_BAST:
3980                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3981                         error = -EINVAL;
3982                 break;
3983
3984         case DLM_MSG_REQUEST_REPLY:
3985                 if (!is_process_copy(lkb))
3986                         error = -EINVAL;
3987                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3988                         error = -EINVAL;
3989                 break;
3990
3991         default:
3992                 error = -EINVAL;
3993         }
3994
3995         if (error)
3996                 log_error(lkb->lkb_resource->res_ls,
3997                           "ignore invalid message %d from %d %x %x %x %d",
3998                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3999                           lkb->lkb_flags, lkb->lkb_nodeid);
4000         return error;
4001 }
4002
4003 static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
4004 {
4005         char name[DLM_RESNAME_MAXLEN + 1];
4006         struct dlm_message *ms;
4007         struct dlm_mhandle *mh;
4008         struct dlm_rsb *r;
4009         uint32_t hash, b;
4010         int rv, dir_nodeid;
4011
4012         memset(name, 0, sizeof(name));
4013         memcpy(name, ms_name, len);
4014
4015         hash = jhash(name, len, 0);
4016         b = hash & (ls->ls_rsbtbl_size - 1);
4017
4018         dir_nodeid = dlm_hash2nodeid(ls, hash);
4019
4020         log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name);
4021
4022         spin_lock(&ls->ls_rsbtbl[b].lock);
4023         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4024         if (!rv) {
4025                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4026                 log_error(ls, "repeat_remove on keep %s", name);
4027                 return;
4028         }
4029
4030         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4031         if (!rv) {
4032                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4033                 log_error(ls, "repeat_remove on toss %s", name);
4034                 return;
4035         }
4036
4037         /* use ls->remove_name2 to avoid conflict with shrink? */
4038
4039         spin_lock(&ls->ls_remove_spin);
4040         ls->ls_remove_len = len;
4041         memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
4042         spin_unlock(&ls->ls_remove_spin);
4043         spin_unlock(&ls->ls_rsbtbl[b].lock);
4044
4045         rv = _create_message(ls, sizeof(struct dlm_message) + len,
4046                              dir_nodeid, DLM_MSG_REMOVE, &ms, &mh);
4047         if (rv)
4048                 return;
4049
4050         memcpy(ms->m_extra, name, len);
4051         ms->m_hash = hash;
4052
4053         send_message(mh, ms);
4054
4055         spin_lock(&ls->ls_remove_spin);
4056         ls->ls_remove_len = 0;
4057         memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
4058         spin_unlock(&ls->ls_remove_spin);
4059 }
4060
4061 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
4062 {
4063         struct dlm_lkb *lkb;
4064         struct dlm_rsb *r;
4065         int from_nodeid;
4066         int error, namelen = 0;
4067
4068         from_nodeid = ms->m_header.h_nodeid;
4069
4070         error = create_lkb(ls, &lkb);
4071         if (error)
4072                 goto fail;
4073
4074         receive_flags(lkb, ms);
4075         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4076         error = receive_request_args(ls, lkb, ms);
4077         if (error) {
4078                 __put_lkb(ls, lkb);
4079                 goto fail;
4080         }
4081
4082         /* The dir node is the authority on whether we are the master
4083            for this rsb or not, so if the master sends us a request, we should
4084            recreate the rsb if we've destroyed it.   This race happens when we
4085            send a remove message to the dir node at the same time that the dir
4086            node sends us a request for the rsb. */
4087
4088         namelen = receive_extralen(ms);
4089
4090         error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4091                          R_RECEIVE_REQUEST, &r);
4092         if (error) {
4093                 __put_lkb(ls, lkb);
4094                 goto fail;
4095         }
4096
4097         lock_rsb(r);
4098
4099         if (r->res_master_nodeid != dlm_our_nodeid()) {
4100                 error = validate_master_nodeid(ls, r, from_nodeid);
4101                 if (error) {
4102                         unlock_rsb(r);
4103                         put_rsb(r);
4104                         __put_lkb(ls, lkb);
4105                         goto fail;
4106                 }
4107         }
4108
4109         attach_lkb(r, lkb);
4110         error = do_request(r, lkb);
4111         send_request_reply(r, lkb, error);
4112         do_request_effects(r, lkb, error);
4113
4114         unlock_rsb(r);
4115         put_rsb(r);
4116
4117         if (error == -EINPROGRESS)
4118                 error = 0;
4119         if (error)
4120                 dlm_put_lkb(lkb);
4121         return 0;
4122
4123  fail:
4124         /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4125            and do this receive_request again from process_lookup_list once
4126            we get the lookup reply.  This would avoid a many repeated
4127            ENOTBLK request failures when the lookup reply designating us
4128            as master is delayed. */
4129
4130         /* We could repeatedly return -EBADR here if our send_remove() is
4131            delayed in being sent/arriving/being processed on the dir node.
4132            Another node would repeatedly lookup up the master, and the dir
4133            node would continue returning our nodeid until our send_remove
4134            took effect.
4135
4136            We send another remove message in case our previous send_remove
4137            was lost/ignored/missed somehow. */
4138