aboutsummaryrefslogtreecommitdiff
path: root/fs/dlm/lock.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/lock.c')
-rw-r--r--fs/dlm/lock.c3871
1 files changed, 3871 insertions, 0 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
new file mode 100644
index 00000000000..3f2befa4797
--- /dev/null
+++ b/fs/dlm/lock.c
@@ -0,0 +1,3871 @@
+/******************************************************************************
+*******************************************************************************
+**
+** Copyright (C) 2005 Red Hat, Inc. All rights reserved.
+**
+** This copyrighted material is made available to anyone wishing to use,
+** modify, copy, or redistribute it subject to the terms and conditions
+** of the GNU General Public License v.2.
+**
+*******************************************************************************
+******************************************************************************/
+
+/* Central locking logic has four stages:
+
+ dlm_lock()
+ dlm_unlock()
+
+ request_lock(ls, lkb)
+ convert_lock(ls, lkb)
+ unlock_lock(ls, lkb)
+ cancel_lock(ls, lkb)
+
+ _request_lock(r, lkb)
+ _convert_lock(r, lkb)
+ _unlock_lock(r, lkb)
+ _cancel_lock(r, lkb)
+
+ do_request(r, lkb)
+ do_convert(r, lkb)
+ do_unlock(r, lkb)
+ do_cancel(r, lkb)
+
+ Stage 1 (lock, unlock) is mainly about checking input args and
+ splitting into one of the four main operations:
+
+ dlm_lock = request_lock
+ dlm_lock+CONVERT = convert_lock
+ dlm_unlock = unlock_lock
+ dlm_unlock+CANCEL = cancel_lock
+
+ Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
+ provided to the next stage.
+
+ Stage 3, _xxxx_lock(), determines if the operation is local or remote.
+ When remote, it calls send_xxxx(), when local it calls do_xxxx().
+
+ Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
+ given rsb and lkb and queues callbacks.
+
+ For remote operations, send_xxxx() results in the corresponding do_xxxx()
+ function being executed on the remote node. The connecting send/receive
+ calls on local (L) and remote (R) nodes:
+
+ L: send_xxxx() -> R: receive_xxxx()
+ R: do_xxxx()
+ L: receive_xxxx_reply() <- R: send_xxxx_reply()
+*/
+#include <linux/types.h>
+#include "dlm_internal.h"
+#include <linux/dlm_device.h>
+#include "memory.h"
+#include "lowcomms.h"
+#include "requestqueue.h"
+#include "util.h"
+#include "dir.h"
+#include "member.h"
+#include "lockspace.h"
+#include "ast.h"
+#include "lock.h"
+#include "rcom.h"
+#include "recover.h"
+#include "lvb_table.h"
+#include "user.h"
+#include "config.h"
+
+static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
+static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
+static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
+static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
+static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
+static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
+static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
+static int send_remove(struct dlm_rsb *r);
+static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
+static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
+ struct dlm_message *ms);
+static int receive_extralen(struct dlm_message *ms);
+
+/*
+ * Lock compatibilty matrix - thanks Steve
+ * UN = Unlocked state. Not really a state, used as a flag
+ * PD = Padding. Used to make the matrix a nice power of two in size
+ * Other states are the same as the VMS DLM.
+ * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
+ */
+
+static const int __dlm_compat_matrix[8][8] = {
+ /* UN NL CR CW PR PW EX PD */
+ {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
+ {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
+ {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
+ {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
+ {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
+ {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
+ {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
+ {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
+};
+
+/*
+ * This defines the direction of transfer of LVB data.
+ * Granted mode is the row; requested mode is the column.
+ * Usage: matrix[grmode+1][rqmode+1]
+ * 1 = LVB is returned to the caller
+ * 0 = LVB is written to the resource
+ * -1 = nothing happens to the LVB
+ */
+
+const int dlm_lvb_operations[8][8] = {
+ /* UN NL CR CW PR PW EX PD*/
+ { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
+ { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
+ { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
+ { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
+ { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
+ { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
+ { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
+ { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
+};
+
+#define modes_compat(gr, rq) \
+ __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
+
+int dlm_modes_compat(int mode1, int mode2)
+{
+ return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
+}
+
+/*
+ * Compatibility matrix for conversions with QUECVT set.
+ * Granted mode is the row; requested mode is the column.
+ * Usage: matrix[grmode+1][rqmode+1]
+ */
+
+static const int __quecvt_compat_matrix[8][8] = {
+ /* UN NL CR CW PR PW EX PD */
+ {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
+ {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
+ {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
+ {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
+ {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
+ {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
+ {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
+ {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
+};
+
+void dlm_print_lkb(struct dlm_lkb *lkb)
+{
+ printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
+ " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
+ lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
+ lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
+ lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
+}
+
+void dlm_print_rsb(struct dlm_rsb *r)
+{
+ printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
+ r->res_nodeid, r->res_flags, r->res_first_lkid,
+ r->res_recover_locks_count, r->res_name);
+}
+
+void dlm_dump_rsb(struct dlm_rsb *r)
+{
+ struct dlm_lkb *lkb;
+
+ dlm_print_rsb(r);
+
+ printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
+ list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
+ printk(KERN_ERR "rsb lookup list\n");
+ list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
+ dlm_print_lkb(lkb);
+ printk(KERN_ERR "rsb grant queue:\n");
+ list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
+ dlm_print_lkb(lkb);
+ printk(KERN_ERR "rsb convert queue:\n");
+ list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
+ dlm_print_lkb(lkb);
+ printk(KERN_ERR "rsb wait queue:\n");
+ list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
+ dlm_print_lkb(lkb);
+}
+
+/* Threads cannot use the lockspace while it's being recovered */
+
+static inline void lock_recovery(struct dlm_ls *ls)
+{
+ down_read(&ls->ls_in_recovery);
+}
+
+static inline void unlock_recovery(struct dlm_ls *ls)
+{
+ up_read(&ls->ls_in_recovery);
+}
+
+static inline int lock_recovery_try(struct dlm_ls *ls)
+{
+ return down_read_trylock(&ls->ls_in_recovery);
+}
+
+static inline int can_be_queued(struct dlm_lkb *lkb)
+{
+ return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
+}
+
+static inline int force_blocking_asts(struct dlm_lkb *lkb)
+{
+ return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
+}
+
+static inline int is_demoted(struct dlm_lkb *lkb)
+{
+ return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
+}
+
+static inline int is_remote(struct dlm_rsb *r)
+{
+ DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
+ return !!r->res_nodeid;
+}
+
+static inline int is_process_copy(struct dlm_lkb *lkb)
+{
+ return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
+}
+
+static inline int is_master_copy(struct dlm_lkb *lkb)
+{
+ if (lkb->lkb_flags & DLM_IFL_MSTCPY)
+ DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
+ return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
+}
+
+static inline int middle_conversion(struct dlm_lkb *lkb)
+{
+ if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
+ (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
+ return 1;
+ return 0;
+}
+
+static inline int down_conversion(struct dlm_lkb *lkb)
+{
+ return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
+}
+
+static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
+{
+ if (is_master_copy(lkb))
+ return;
+
+ DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
+
+ lkb->lkb_lksb->sb_status = rv;
+ lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
+
+ dlm_add_ast(lkb, AST_COMP);
+}
+
+static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
+{
+ if (is_master_copy(lkb))
+ send_bast(r, lkb, rqmode);
+ else {
+ lkb->lkb_bastmode = rqmode;
+ dlm_add_ast(lkb, AST_BAST);
+ }
+}
+
+/*
+ * Basic operations on rsb's and lkb's
+ */
+
+static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
+{
+ struct dlm_rsb *r;
+
+ r = allocate_rsb(ls, len);
+ if (!r)
+ return NULL;
+
+ r->res_ls = ls;
+ r->res_length = len;
+ memcpy(r->res_name, name, len);
+ mutex_init(&r->res_mutex);
+
+ INIT_LIST_HEAD(&r->res_lookup);
+ INIT_LIST_HEAD(&r->res_grantqueue);
+ INIT_LIST_HEAD(&r->res_convertqueue);
+ INIT_LIST_HEAD(&r->res_waitqueue);
+ INIT_LIST_HEAD(&r->res_root_list);
+ INIT_LIST_HEAD(&r->res_recover_list);
+
+ return r;
+}
+
+static int search_rsb_list(struct list_head *head, char *name, int len,
+ unsigned int flags, struct dlm_rsb **r_ret)
+{
+ struct dlm_rsb *r;
+ int error = 0;
+
+ list_for_each_entry(r, head, res_hashchain) {
+ if (len == r->res_length && !memcmp(name, r->res_name, len))
+ goto found;
+ }
+ return -EBADR;
+
+ found:
+ if (r->res_nodeid && (flags & R_MASTER))
+ error = -ENOTBLK;
+ *r_ret = r;
+ return error;
+}
+
+static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
+ unsigned int flags, struct dlm_rsb **r_ret)
+{
+ struct dlm_rsb *r;
+ int error;
+
+ error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
+ if (!error) {
+ kref_get(&r->res_ref);
+ goto out;
+ }
+ error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
+ if (error)
+ goto out;
+
+ list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
+
+ if (dlm_no_directory(ls))
+ goto out;
+
+ if (r->res_nodeid == -1) {
+ rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
+ r->res_first_lkid = 0;
+ } else if (r->res_nodeid > 0) {
+ rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
+ r->res_first_lkid = 0;
+ } else {
+ DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
+ DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
+ }
+ out:
+ *r_ret = r;
+ return error;
+}
+
+static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
+ unsigned int flags, struct dlm_rsb **r_ret)
+{
+ int error;
+ write_lock(&ls->ls_rsbtbl[b].lock);
+ error = _search_rsb(ls, name, len, b, flags, r_ret);
+ write_unlock(&ls->ls_rsbtbl[b].lock);
+ return error;
+}
+
+/*
+ * Find rsb in rsbtbl and potentially create/add one
+ *
+ * Delaying the release of rsb's has a similar benefit to applications keeping
+ * NL locks on an rsb, but without the guarantee that the cached master value
+ * will still be valid when the rsb is reused. Apps aren't always smart enough
+ * to keep NL locks on an rsb that they may lock again shortly; this can lead
+ * to excessive master lookups and removals if we don't delay the release.
+ *
+ * Searching for an rsb means looking through both the normal list and toss
+ * list. When found on the toss list the rsb is moved to the normal list with
+ * ref count of 1; when found on normal list the ref count is incremented.
+ */
+
+static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
+ unsigned int flags, struct dlm_rsb **r_ret)
+{
+ struct dlm_rsb *r, *tmp;
+ uint32_t hash, bucket;
+ int error = 0;
+
+ if (dlm_no_directory(ls))
+ flags |= R_CREATE;
+
+ hash = jhash(name, namelen, 0);
+ bucket = hash & (ls->ls_rsbtbl_size - 1);
+
+ error = search_rsb(ls, name, namelen, bucket, flags, &r);
+ if (!error)
+ goto out;
+
+ if (error == -EBADR && !(flags & R_CREATE))
+ goto out;
+
+ /* the rsb was found but wasn't a master copy */
+ if (error == -ENOTBLK)
+ goto out;
+
+ error = -ENOMEM;
+ r = create_rsb(ls, name, namelen);
+ if (!r)
+ goto out;
+
+ r->res_hash = hash;
+ r->res_bucket = bucket;
+ r->res_nodeid = -1;
+ kref_init(&r->res_ref);
+
+ /* With no directory, the master can be set immediately */
+ if (dlm_no_directory(ls)) {
+ int nodeid = dlm_dir_nodeid(r);
+ if (nodeid == dlm_our_nodeid())
+ nodeid = 0;
+ r->res_nodeid = nodeid;
+ }
+
+ write_lock(&ls->ls_rsbtbl[bucket].lock);
+ error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
+ if (!error) {
+ write_unlock(&ls->ls_rsbtbl[bucket].lock);
+ free_rsb(r);
+ r = tmp;
+ goto out;
+ }
+ list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
+ write_unlock(&ls->ls_rsbtbl[bucket].lock);
+ error = 0;
+ out:
+ *r_ret = r;
+ return error;
+}
+
+int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
+ unsigned int flags, struct dlm_rsb **r_ret)
+{
+ return find_rsb(ls, name, namelen, flags, r_ret);
+}
+
+/* This is only called to add a reference when the code already holds
+ a valid reference to the rsb, so there's no need for locking. */
+
+static inline void hold_rsb(struct dlm_rsb *r)
+{
+ kref_get(&r->res_ref);
+}
+
+void dlm_hold_rsb(struct dlm_rsb *r)
+{
+ hold_rsb(r);
+}
+
+static void toss_rsb(struct kref *kref)
+{
+ struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
+ struct dlm_ls *ls = r->res_ls;
+
+ DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
+ kref_init(&r->res_ref);
+ list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
+ r->res_toss_time = jiffies;
+ if (r->res_lvbptr) {
+ free_lvb(r->res_lvbptr);
+ r->res_lvbptr = NULL;
+ }
+}
+
+/* When all references to the rsb are gone it's transfered to
+ the tossed list for later disposal. */
+
+static void put_rsb(struct dlm_rsb *r)
+{
+ struct dlm_ls *ls = r->res_ls;
+ uint32_t bucket = r->res_bucket;
+
+ write_lock(&ls->ls_rsbtbl[bucket].lock);
+ kref_put(&r->res_ref, toss_rsb);
+ write_unlock(&ls->ls_rsbtbl[bucket].lock);
+}
+
+void dlm_put_rsb(struct dlm_rsb *r)
+{
+ put_rsb(r);
+}
+
+/* See comment for unhold_lkb */
+
+static void unhold_rsb(struct dlm_rsb *r)
+{
+ int rv;
+ rv = kref_put(&r->res_ref, toss_rsb);
+ DLM_ASSERT(!rv, dlm_dump_rsb(r););
+}
+
+static void kill_rsb(struct kref *kref)
+{
+ struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
+
+ /* All work is done after the return from kref_put() so we
+ can release the write_lock before the remove and free. */
+
+ DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
+ DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
+ DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
+ DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
+ DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
+ DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
+}
+
+/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
+ The rsb must exist as long as any lkb's for it do. */
+
+static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
+{
+ hold_rsb(r);
+ lkb->lkb_resource = r;
+}
+
+static void detach_lkb(struct dlm_lkb *lkb)
+{
+ if (lkb->lkb_resource) {
+ put_rsb(lkb->lkb_resource);
+ lkb->lkb_resource = NULL;
+ }
+}
+
+static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
+{
+ struct dlm_lkb *lkb, *tmp;
+ uint32_t lkid = 0;
+ uint16_t bucket;
+
+ lkb = allocate_lkb(ls);
+ if (!lkb)
+ return -ENOMEM;
+
+ lkb->lkb_nodeid = -1;
+ lkb->lkb_grmode = DLM_LOCK_IV;
+ kref_init(&lkb->lkb_ref);
+ INIT_LIST_HEAD(&lkb->lkb_ownqueue);
+
+ get_random_bytes(&bucket, sizeof(bucket));
+ bucket &= (ls->ls_lkbtbl_size - 1);
+
+ write_lock(&ls->ls_lkbtbl[bucket].lock);
+
+ /* counter can roll over so we must verify lkid is not in use */
+
+ while (lkid == 0) {
+ lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
+
+ list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
+ lkb_idtbl_list) {
+ if (tmp->lkb_id != lkid)
+ continue;
+ lkid = 0;
+ break;
+ }
+ }
+
+ lkb->lkb_id = lkid;
+ list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
+ write_unlock(&ls->ls_lkbtbl[bucket].lock);
+
+ *lkb_ret = lkb;
+ return 0;
+}
+
+static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
+{
+ uint16_t bucket = lkid & 0xFFFF;
+ struct dlm_lkb *lkb;
+
+ list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
+ if (lkb->lkb_id == lkid)
+ return lkb;
+ }
+ return NULL;
+}
+
+static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
+{
+ struct dlm_lkb *lkb;
+ uint16_t bucket = lkid & 0xFFFF;
+
+ if (bucket >= ls->ls_lkbtbl_size)
+ return -EBADSLT;
+
+ read_lock(&ls->ls_lkbtbl[bucket].lock);
+ lkb = __find_lkb(ls, lkid);
+ if (lkb)
+ kref_get(&lkb->lkb_ref);
+ read_unlock(&ls->ls_lkbtbl[bucket].lock);
+
+ *lkb_ret = lkb;
+ return lkb ? 0 : -ENOENT;
+}
+
+static void kill_lkb(struct kref *kref)
+{
+ struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
+
+ /* All work is done after the return from kref_put() so we
+ can release the write_lock before the detach_lkb */
+
+ DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
+}
+
+/* __put_lkb() is used when an lkb may not have an rsb attached to
+ it so we need to provide the lockspace explicitly */
+
+static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
+{
+ uint16_t bucket = lkb->lkb_id & 0xFFFF;
+
+ write_lock(&ls->ls_lkbtbl[bucket].lock);
+ if (kref_put(&lkb->lkb_ref, kill_lkb)) {
+ list_del(&lkb->lkb_idtbl_list);
+ write_unlock(&ls->ls_lkbtbl[bucket].lock);
+
+ detach_lkb(lkb);
+
+ /* for local/process lkbs, lvbptr points to caller's lksb */
+ if (lkb->lkb_lvbptr && is_master_copy(lkb))
+ free_lvb(lkb->lkb_lvbptr);
+ free_lkb(lkb);
+ return 1;
+ } else {
+ write_unlock(&ls->ls_lkbtbl[bucket].lock);
+ return 0;
+ }
+}
+
+int dlm_put_lkb(struct dlm_lkb *lkb)
+{
+ struct dlm_ls *ls;
+
+ DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
+ DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
+
+ ls = lkb->lkb_resource->res_ls;
+ return __put_lkb(ls, lkb);
+}
+
+/* This is only called to add a reference when the code already holds
+ a valid reference to the lkb, so there's no need for locking. */
+
+static inline void hold_lkb(struct dlm_lkb *lkb)
+{
+ kref_get(&lkb->lkb_ref);
+}
+
+/* This is called when we need to remove a reference and are certain
+ it's not the last ref. e.g. del_lkb is always called between a
+ find_lkb/put_lkb and is always the inverse of a previous add_lkb.
+ put_lkb would work fine, but would involve unnecessary locking */
+
+static inline void unhold_lkb(struct dlm_lkb *lkb)
+{
+ int rv;
+ rv = kref_put(&lkb->lkb_ref, kill_lkb);
+ DLM_ASSERT(!rv, dlm_print_lkb(lkb););
+}
+
+static void lkb_add_ordered(struct list_head *new, struct list_head *head,
+ int mode)
+{
+ struct dlm_lkb *lkb = NULL;
+
+ list_for_each_entry(lkb, head, lkb_statequeue)
+ if (lkb->lkb_rqmode < mode)
+ break;
+
+ if (!lkb)
+ list_add_tail(new, head);
+ else
+ __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
+}
+
+/* add/remove lkb to rsb's grant/convert/wait queue */
+
+static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
+{
+ kref_get(&lkb->lkb_ref);
+
+ DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
+
+ lkb->lkb_status = status;
+
+ switch (status) {
+ case DLM_LKSTS_WAITING:
+ if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
+ list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
+ else
+ list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
+ break;
+ case DLM_LKSTS_GRANTED:
+ /* convention says granted locks kept in order of grmode */
+ lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
+ lkb->lkb_grmode);
+ break;
+ case DLM_LKSTS_CONVERT:
+ if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
+ list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
+ else
+ list_add_tail(&lkb->lkb_statequeue,
+ &r->res_convertqueue);
+ break;
+ default:
+ DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
+ }
+}
+
+static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
+{
+ lkb->lkb_status = 0;
+ list_del(&lkb->lkb_statequeue);
+ unhold_lkb(lkb);
+}
+
+static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
+{
+ hold_lkb(lkb);
+ del_lkb(r, lkb);
+ add_lkb(r, lkb, sts);
+ unhold_lkb(lkb);
+}
+
+/* add/remove lkb from global waiters list of lkb's waiting for
+ a reply from a remote node */
+
+static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
+{
+ struct dlm_ls *ls = lkb->lkb_resource->res_ls;
+
+ mutex_lock(&ls->ls_waiters_mutex);
+ if (lkb->lkb_wait_type) {
+ log_print("add_to_waiters error %d", lkb->lkb_wait_type);
+ goto out;
+ }
+ lkb->lkb_wait_type = mstype;
+ kref_get(&lkb->lkb_ref);
+ list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
+ out:
+ mutex_unlock(&ls->ls_waiters_mutex);
+}
+
+static int _remove_from_waiters(struct dlm_lkb *lkb)
+{
+ int error = 0;
+
+ if (!lkb->lkb_wait_type) {
+ log_print("remove_from_waiters error");
+ error = -EINVAL;
+ goto out;
+ }
+ lkb->lkb_wait_type = 0;
+ list_del(&lkb->lkb_wait_reply);
+ unhold_lkb(lkb);
+ out:
+ return error;
+}
+
+static int remove_from_waiters(struct dlm_lkb *lkb)
+{
+ struct dlm_ls *ls = lkb->lkb_resource->res_ls;
+ int error;
+
+ mutex_lock(&ls->ls_waiters_mutex);
+ error = _remove_from_waiters(lkb);
+ mutex_unlock(&ls->ls_waiters_mutex);
+ return error;
+}
+
+static void dir_remove(struct dlm_rsb *r)
+{
+ int to_nodeid;
+
+ if (dlm_no_directory(r->res_ls))
+ return;
+
+ to_nodeid = dlm_dir_nodeid(r);
+ if (to_nodeid != dlm_our_nodeid())
+ send_remove(r);
+ else
+ dlm_dir_remove_entry(r->res_ls, to_nodeid,
+ r->res_name, r->res_length);
+}
+
+/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
+ found since they are in order of newest to oldest? */
+
+static int shrink_bucket(struct dlm_ls *ls, int b)
+{
+ struct dlm_rsb *r;
+ int count = 0, found;
+
+ for (;;) {
+ found = 0;
+ write_lock(&ls->ls_rsbtbl[b].lock);
+ list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
+ res_hashchain) {
+ if (!time_after_eq(jiffies, r->res_toss_time +
+ dlm_config.toss_secs * HZ))
+ continue;
+ found = 1;
+ break;
+ }
+
+ if (!found) {
+ write_unlock(&ls->ls_rsbtbl[b].lock);
+ break;
+ }
+
+ if (kref_put(&r->res_ref, kill_rsb)) {
+ list_del(&r->res_hashchain);
+ write_unlock(&ls->ls_rsbtbl[b].lock);
+
+ if (is_master(r))
+ dir_remove(r);
+ free_rsb(r);
+ count++;
+ } else {
+ write_unlock(&ls->ls_rsbtbl[b].lock);
+ log_error(ls, "tossed rsb in use %s", r->res_name);
+ }
+ }
+
+ return count;
+}
+
+void dlm_scan_rsbs(struct dlm_ls *ls)
+{
+ int i;
+
+ if (dlm_locking_stopped(ls))
+ return;
+
+ for (i = 0; i < ls->ls_rsbtbl_size; i++) {
+ shrink_bucket(ls, i);
+ cond_resched();
+ }
+}
+
+/* lkb is master or local copy */
+
+static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
+{
+ int b, len = r->res_ls->ls_lvblen;
+
+ /* b=1 lvb returned to caller
+ b=0 lvb written to rsb or invalidated
+ b=-1 do nothing */
+
+ b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
+
+ if (b == 1) {
+ if (!lkb->lkb_lvbptr)
+ return;
+
+ if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
+ return;
+
+ if (!r->res_lvbptr)
+ return;
+
+ memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
+ lkb->lkb_lvbseq = r->res_lvbseq;
+
+ } else if (b == 0) {
+ if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
+ rsb_set_flag(r, RSB_VALNOTVALID);
+ return;
+ }
+
+ if (!lkb->lkb_lvbptr)
+ return;
+
+ if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
+ return;
+
+ if (!r->res_lvbptr)
+ r->res_lvbptr = allocate_lvb(r->res_ls);
+
+ if (!r->res_lvbptr)
+ return;
+
+ memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
+ r->res_lvbseq++;
+ lkb->lkb_lvbseq = r->res_lvbseq;
+ rsb_clear_flag(r, RSB_VALNOTVALID);
+ }
+
+ if (rsb_flag(r, RSB_VALNOTVALID))
+ lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
+}
+
+static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
+{
+ if (lkb->lkb_grmode < DLM_LOCK_PW)
+ return;
+
+ if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
+ rsb_set_flag(r, RSB_VALNOTVALID);
+ return;
+ }
+
+ if (!lkb->lkb_lvbptr)
+ return;
+
+ if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
+ return;
+
+ if (!r->res_lvbptr)
+ r->res_lvbptr = allocate_lvb(r->res_ls);
+
+ if (!r->res_lvbptr)
+ return;
+
+ memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
+ r->res_lvbseq++;
+ rsb_clear_flag(r, RSB_VALNOTVALID);
+}
+
+/* lkb is process copy (pc) */
+
+static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
+ struct dlm_message *ms)
+{
+ int b;
+
+ if (!lkb->lkb_lvbptr)
+ return;
+
+ if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
+ return;
+
+ b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
+ if (b == 1) {
+ int len = receive_extralen(ms);
+ memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
+ lkb->lkb_lvbseq = ms->m_lvbseq;
+ }
+}
+
+/* Manipulate lkb's on rsb's convert/granted/waiting queues
+ remove_lock -- used for unlock, removes lkb from granted
+ revert_lock -- used for cancel, moves lkb from convert to granted
+ grant_lock -- used for request and convert, adds lkb to granted or
+ moves lkb from convert or waiting to granted
+
+ Each of these is used for master or local copy lkb's. There is
+ also a _pc() variation used to make the corresponding change on
+ a process copy (pc) lkb. */
+
+static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
+{
+ del_lkb(r, lkb);
+ lkb->lkb_grmode = DLM_LOCK_IV;
+ /* this unhold undoes the original ref from create_lkb()
+ so this leads to the lkb being freed */
+ unhold_lkb(lkb);
+}
+
+static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
+{
+ set_lvb_unlock(r, lkb);
+ _remove_lock(r, lkb);
+}
+
+static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
+{
+ _remove_lock(r, lkb);
+}
+
+static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
+{
+ lkb->lkb_rqmode = DLM_LOCK_IV;
+
+ switch (lkb->lkb_status) {
+ case DLM_LKSTS_GRANTED:
+ break;
+ case DLM_LKSTS_CONVERT:
+ move_lkb(r, lkb, DLM_LKSTS_GRANTED);
+ break;
+ case DLM_LKSTS_WAITING:
+ del_lkb(r, lkb);
+ lkb->lkb_grmode = DLM_LOCK_IV;
+ /* this unhold undoes the original ref from create_lkb()
+ so this leads to the lkb being freed */
+ unhold_lkb(lkb);
+ break;
+ default:
+ log_print("invalid status for revert %d", lkb->lkb_status);
+ }
+}
+
+static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
+{
+ revert_lock(r, lkb);
+}
+
+static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
+{
+ if (lkb->lkb_grmode != lkb->lkb_rqmode) {
+ lkb->lkb_grmode = lkb->lkb_rqmode;
+ if (lkb->lkb_status)
+ move_lkb(r, lkb, DLM_LKSTS_GRANTED);
+ else
+ add_lkb(r, lkb, DLM_LKSTS_GRANTED);
+ }
+
+ lkb->lkb_rqmode = DLM_LOCK_IV;
+}
+
+static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
+{
+ set_lvb_lock(r, lkb);
+ _grant_lock(r, lkb);
+ lkb->lkb_highbast = 0;
+}
+
+static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
+ struct dlm_message *ms)
+{
+ set_lvb_lock_pc(r, lkb, ms);
+ _grant_lock(r, lkb);
+}
+
+/* called by grant_pending_locks() which means an async grant message must
+ be sent to the requesting node in addition to granting the lock if the
+ lkb belongs to a remote node. */
+
+static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
+{
+ grant_lock(r, lkb);
+ if (is_master_copy(lkb))
+ send_grant(r, lkb);
+ else
+ queue_cast(r, lkb, 0);
+}
+
+static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
+{
+ struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
+ lkb_statequeue);
+ if (lkb->lkb_id == first->lkb_id)
+ return 1;
+
+ return 0;
+}
+
+/* Check if the given lkb conflicts with another lkb on the queue. */
+
+static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
+{
+ struct dlm_lkb *this;
+
+ list_for_each_entry(this, head, lkb_statequeue) {
+ if (this == lkb)
+ continue;
+ if (!modes_compat(this, lkb))
+ return 1;
+ }
+ return 0;
+}
+
+/*
+ * "A conversion deadlock arises with a pair of lock requests in the converting
+ * queue for one resource. The granted mode of each lock blocks the requested
+ * mode of the other lock."
+ *
+ * Part 2: if the granted mode of lkb is preventing the first lkb in the
+ * convert queue from being granted, then demote lkb (set grmode to NL).
+ * This second form requires that we check for conv-deadlk even when
+ * now == 0 in _can_be_granted().
+ *
+ * Example:
+ * Granted Queue: empty
+ * Convert Queue: NL->EX (first lock)
+ * PR->EX (second lock)
+ *
+ * The first lock can't be granted because of the granted mode of the second
+ * lock and the second lock can't be granted because it's not first in the
+ * list. We demote the granted mode of the second lock (the lkb passed to this
+ * function).
+ *
+ * After the resolution, the "grant pending" function needs to go back and try
+ * to grant locks on the convert queue again since the first lock can now be
+ * granted.
+ */
+
+static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
+{
+ struct dlm_lkb *this, *first = NULL, *self = NULL;
+
+ list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
+ if (!first)
+ first = this;
+ if (this == lkb) {
+ self = lkb;
+ continue;
+ }
+
+ if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
+ return 1;
+ }
+
+ /* if lkb is on the convert queue and is preventing the first
+ from being granted, then there's deadlock and we demote lkb.
+ multiple converting locks may need to do this before the first
+ converting lock can be granted. */
+
+ if (self && self != first) {
+ if (!modes_compat(lkb, first) &&
+ !queue_conflict(&rsb->res_grantqueue, first))
+ return 1;
+ }
+
+ return 0;
+}
+
+/*
+ * Return 1 if the lock can be granted, 0 otherwise.
+ * Also detect and resolve conversion deadlocks.
+ *
+ * lkb is the lock to be granted
+ *
+ * now is 1 if the function is being called in the context of the
+ * immediate request, it is 0 if called later, after the lock has been
+ * queued.
+ *
+ * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
+ */
+
+static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
+{
+ int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
+
+ /*
+ * 6-10: Version 5.4 introduced an option to address the phenomenon of
+ * a new request for a NL mode lock being blocked.
+ *
+ * 6-11: If the optional EXPEDITE flag is used with the new NL mode
+ * request, then it would be granted. In essence, the use of this flag
+ * tells the Lock Manager to expedite theis request by not considering
+ * what may be in the CONVERTING or WAITING queues... As of this
+ * writing, the EXPEDITE flag can be used only with new requests for NL
+ * mode locks. This flag is not valid for conversion requests.
+ *
+ * A shortcut. Earlier checks return an error if EXPEDITE is used in a
+ * conversion or used with a non-NL requested mode. We also know an
+ * EXPEDITE request is always granted immediately, so now must always
+ * be 1. The full condition to grant an expedite request: (now &&
+ * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
+ * therefore be shortened to just checking the flag.
+ */
+
+ if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
+ return 1;
+
+ /*
+ * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
+ * added to the remaining conditions.
+ */
+
+ if (queue_conflict(&r->res_grantqueue, lkb))
+ goto out;
+
+ /*
+ * 6-3: By default, a conversion request is immediately granted if the