diff options
Diffstat (limited to 'fs/dlm')
-rw-r--r-- | fs/dlm/Kconfig | 31 | ||||
-rw-r--r-- | fs/dlm/Makefile | 6 | ||||
-rw-r--r-- | fs/dlm/ast.c | 1 | ||||
-rw-r--r-- | fs/dlm/config.c | 10 | ||||
-rw-r--r-- | fs/dlm/config.h | 3 | ||||
-rw-r--r-- | fs/dlm/dlm_internal.h | 11 | ||||
-rw-r--r-- | fs/dlm/lock.c | 955 | ||||
-rw-r--r-- | fs/dlm/lock.h | 2 | ||||
-rw-r--r-- | fs/dlm/lockspace.c | 4 | ||||
-rw-r--r-- | fs/dlm/lowcomms-sctp.c | 1210 | ||||
-rw-r--r-- | fs/dlm/lowcomms.c (renamed from fs/dlm/lowcomms-tcp.c) | 788 | ||||
-rw-r--r-- | fs/dlm/user.c | 163 |
12 files changed, 1508 insertions, 1676 deletions
diff --git a/fs/dlm/Kconfig b/fs/dlm/Kconfig index 6fa7b0d5c04..69a94690e49 100644 --- a/fs/dlm/Kconfig +++ b/fs/dlm/Kconfig @@ -3,36 +3,19 @@ menu "Distributed Lock Manager" config DLM tristate "Distributed Lock Manager (DLM)" - depends on SYSFS && (IPV6 || IPV6=n) + depends on IPV6 || IPV6=n select CONFIGFS_FS - select IP_SCTP if DLM_SCTP + select IP_SCTP help - A general purpose distributed lock manager for kernel or userspace - applications. - -choice - prompt "Select DLM communications protocol" - depends on DLM - default DLM_TCP - help - The DLM Can use TCP or SCTP for it's network communications. - SCTP supports multi-homed operations whereas TCP doesn't. - However, SCTP seems to have stability problems at the moment. - -config DLM_TCP - bool "TCP/IP" - -config DLM_SCTP - bool "SCTP" - -endchoice + A general purpose distributed lock manager for kernel or userspace + applications. config DLM_DEBUG bool "DLM debugging" depends on DLM help - Under the debugfs mount point, the name of each lockspace will - appear as a file in the "dlm" directory. The output is the - list of resource and locks the local node knows about. + Under the debugfs mount point, the name of each lockspace will + appear as a file in the "dlm" directory. The output is the + list of resource and locks the local node knows about. endmenu diff --git a/fs/dlm/Makefile b/fs/dlm/Makefile index 65388944eba..604cf7dc5f3 100644 --- a/fs/dlm/Makefile +++ b/fs/dlm/Makefile @@ -8,14 +8,12 @@ dlm-y := ast.o \ member.o \ memory.o \ midcomms.o \ + lowcomms.o \ rcom.o \ recover.o \ recoverd.o \ requestqueue.o \ user.o \ - util.o + util.o dlm-$(CONFIG_DLM_DEBUG) += debug_fs.o -dlm-$(CONFIG_DLM_TCP) += lowcomms-tcp.o - -dlm-$(CONFIG_DLM_SCTP) += lowcomms-sctp.o
\ No newline at end of file diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c index f91d39cb1e0..6308122890c 100644 --- a/fs/dlm/ast.c +++ b/fs/dlm/ast.c @@ -14,6 +14,7 @@ #include "dlm_internal.h" #include "lock.h" #include "user.h" +#include "ast.h" #define WAKE_ASTS 0 diff --git a/fs/dlm/config.c b/fs/dlm/config.c index 8665c88e5af..822abdcd143 100644 --- a/fs/dlm/config.c +++ b/fs/dlm/config.c @@ -2,7 +2,7 @@ ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. +** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -89,6 +89,7 @@ struct cluster { unsigned int cl_toss_secs; unsigned int cl_scan_secs; unsigned int cl_log_debug; + unsigned int cl_protocol; }; enum { @@ -101,6 +102,7 @@ enum { CLUSTER_ATTR_TOSS_SECS, CLUSTER_ATTR_SCAN_SECS, CLUSTER_ATTR_LOG_DEBUG, + CLUSTER_ATTR_PROTOCOL, }; struct cluster_attribute { @@ -159,6 +161,7 @@ CLUSTER_ATTR(recover_timer, 1); CLUSTER_ATTR(toss_secs, 1); CLUSTER_ATTR(scan_secs, 1); CLUSTER_ATTR(log_debug, 0); +CLUSTER_ATTR(protocol, 0); static struct configfs_attribute *cluster_attrs[] = { [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr, @@ -170,6 +173,7 @@ static struct configfs_attribute *cluster_attrs[] = { [CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr, [CLUSTER_ATTR_SCAN_SECS] = &cluster_attr_scan_secs.attr, [CLUSTER_ATTR_LOG_DEBUG] = &cluster_attr_log_debug.attr, + [CLUSTER_ATTR_PROTOCOL] = &cluster_attr_protocol.attr, NULL, }; @@ -904,6 +908,7 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num) #define DEFAULT_TOSS_SECS 10 #define DEFAULT_SCAN_SECS 5 #define DEFAULT_LOG_DEBUG 0 +#define DEFAULT_PROTOCOL 0 struct dlm_config_info dlm_config = { .ci_tcp_port = DEFAULT_TCP_PORT, @@ -914,6 +919,7 @@ struct dlm_config_info dlm_config = { .ci_recover_timer = DEFAULT_RECOVER_TIMER, .ci_toss_secs = DEFAULT_TOSS_SECS, .ci_scan_secs = DEFAULT_SCAN_SECS, - .ci_log_debug = DEFAULT_LOG_DEBUG + .ci_log_debug = DEFAULT_LOG_DEBUG, + .ci_protocol = DEFAULT_PROTOCOL }; diff --git a/fs/dlm/config.h b/fs/dlm/config.h index 1e978611a96..967cc3d72e5 100644 --- a/fs/dlm/config.h +++ b/fs/dlm/config.h @@ -2,7 +2,7 @@ ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. +** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -26,6 +26,7 @@ struct dlm_config_info { int ci_toss_secs; int ci_scan_secs; int ci_log_debug; + int ci_protocol; }; extern struct dlm_config_info dlm_config; diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index 61d93201e1b..30994d68f6a 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -2,7 +2,7 @@ ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. +** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -210,6 +210,9 @@ struct dlm_args { #define DLM_IFL_MSTCPY 0x00010000 #define DLM_IFL_RESEND 0x00020000 #define DLM_IFL_DEAD 0x00040000 +#define DLM_IFL_OVERLAP_UNLOCK 0x00080000 +#define DLM_IFL_OVERLAP_CANCEL 0x00100000 +#define DLM_IFL_ENDOFLIFE 0x00200000 #define DLM_IFL_USER 0x00000001 #define DLM_IFL_ORPHAN 0x00000002 @@ -230,8 +233,8 @@ struct dlm_lkb { int8_t lkb_grmode; /* granted lock mode */ int8_t lkb_bastmode; /* requested mode */ int8_t lkb_highbast; /* highest mode bast sent for */ - int8_t lkb_wait_type; /* type of reply waiting for */ + int8_t lkb_wait_count; int8_t lkb_ast_type; /* type of ast queued for */ struct list_head lkb_idtbl_list; /* lockspace lkbtbl */ @@ -339,6 +342,7 @@ struct dlm_header { #define DLM_MSG_LOOKUP 11 #define DLM_MSG_REMOVE 12 #define DLM_MSG_LOOKUP_REPLY 13 +#define DLM_MSG_PURGE 14 struct dlm_message { struct dlm_header m_header; @@ -440,6 +444,9 @@ struct dlm_ls { struct mutex ls_waiters_mutex; struct list_head ls_waiters; /* lkbs needing a reply */ + struct mutex ls_orphans_mutex; + struct list_head ls_orphans; + struct list_head ls_nodes; /* current nodes in ls */ struct list_head ls_nodes_gone; /* dead node list, recovery */ int ls_num_nodes; /* number of nodes in ls */ diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index e725005fafd..d8d6e729f96 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -1,7 +1,7 @@ /****************************************************************************** ******************************************************************************* ** -** Copyright (C) 2005 Red Hat, Inc. All rights reserved. +** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -85,6 +85,7 @@ static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, struct dlm_message *ms); static int receive_extralen(struct dlm_message *ms); +static void do_purge(struct dlm_ls *ls, int nodeid, int pid); /* * Lock compatibilty matrix - thanks Steve @@ -223,6 +224,16 @@ static inline int is_demoted(struct dlm_lkb *lkb) return (lkb->lkb_sbflags & DLM_SBF_DEMOTED); } +static inline int is_altmode(struct dlm_lkb *lkb) +{ + return (lkb->lkb_sbflags & DLM_SBF_ALTMODE); +} + +static inline int is_granted(struct dlm_lkb *lkb) +{ + return (lkb->lkb_status == DLM_LKSTS_GRANTED); +} + static inline int is_remote(struct dlm_rsb *r) { DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r);); @@ -254,6 +265,22 @@ static inline int down_conversion(struct dlm_lkb *lkb) return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode); } +static inline int is_overlap_unlock(struct dlm_lkb *lkb) +{ + return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK; +} + +static inline int is_overlap_cancel(struct dlm_lkb *lkb) +{ + return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL; +} + +static inline int is_overlap(struct dlm_lkb *lkb) +{ + return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK | + DLM_IFL_OVERLAP_CANCEL)); +} + static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) { if (is_master_copy(lkb)) @@ -267,6 +294,12 @@ static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) dlm_add_ast(lkb, AST_COMP); } +static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb) +{ + queue_cast(r, lkb, + is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL); +} + static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) { if (is_master_copy(lkb)) @@ -547,6 +580,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) lkb->lkb_grmode = DLM_LOCK_IV; kref_init(&lkb->lkb_ref); INIT_LIST_HEAD(&lkb->lkb_ownqueue); + INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); get_random_bytes(&bucket, sizeof(bucket)); bucket &= (ls->ls_lkbtbl_size - 1); @@ -556,7 +590,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) /* counter can roll over so we must verify lkid is not in use */ while (lkid == 0) { - lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16); + lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++; list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) { @@ -577,8 +611,8 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid) { - uint16_t bucket = lkid & 0xFFFF; struct dlm_lkb *lkb; + uint16_t bucket = (lkid >> 16); list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) { if (lkb->lkb_id == lkid) @@ -590,7 +624,7 @@ static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid) static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) { struct dlm_lkb *lkb; - uint16_t bucket = lkid & 0xFFFF; + uint16_t bucket = (lkid >> 16); if (bucket >= ls->ls_lkbtbl_size) return -EBADSLT; @@ -620,7 +654,7 @@ static void kill_lkb(struct kref *kref) static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) { - uint16_t bucket = lkb->lkb_id & 0xFFFF; + uint16_t bucket = (lkb->lkb_id >> 16); write_lock(&ls->ls_lkbtbl[bucket].lock); if (kref_put(&lkb->lkb_ref, kill_lkb)) { @@ -735,23 +769,75 @@ static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts) unhold_lkb(lkb); } +static int msg_reply_type(int mstype) +{ + switch (mstype) { + case DLM_MSG_REQUEST: + return DLM_MSG_REQUEST_REPLY; + case DLM_MSG_CONVERT: + return DLM_MSG_CONVERT_REPLY; + case DLM_MSG_UNLOCK: + return DLM_MSG_UNLOCK_REPLY; + case DLM_MSG_CANCEL: + return DLM_MSG_CANCEL_REPLY; + case DLM_MSG_LOOKUP: + return DLM_MSG_LOOKUP_REPLY; + } + return -1; +} + /* add/remove lkb from global waiters list of lkb's waiting for a reply from a remote node */ -static void add_to_waiters(struct dlm_lkb *lkb, int mstype) +static int add_to_waiters(struct dlm_lkb *lkb, int mstype) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; + int error = 0; mutex_lock(&ls->ls_waiters_mutex); - if (lkb->lkb_wait_type) { - log_print("add_to_waiters error %d", lkb->lkb_wait_type); + + if (is_overlap_unlock(lkb) || + (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) { + error = -EINVAL; + goto out; + } + + if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) { + switch (mstype) { + case DLM_MSG_UNLOCK: + lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; + break; + case DLM_MSG_CANCEL: + lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; + break; + default: + error = -EBUSY; + goto out; + } + lkb->lkb_wait_count++; + hold_lkb(lkb); + + log_debug(ls, "add overlap %x cur %d new %d count %d flags %x", + lkb->lkb_id, lkb->lkb_wait_type, mstype, + lkb->lkb_wait_count, lkb->lkb_flags); goto out; } + + DLM_ASSERT(!lkb->lkb_wait_count, + dlm_print_lkb(lkb); + printk("wait_count %d\n", lkb->lkb_wait_count);); + + lkb->lkb_wait_count++; lkb->lkb_wait_type = mstype; - kref_get(&lkb->lkb_ref); + hold_lkb(lkb); list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); out: + if (error) + log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s", + lkb->lkb_id, error, lkb->lkb_flags, mstype, + lkb->lkb_wait_type, lkb->lkb_resource->res_name); mutex_unlock(&ls->ls_waiters_mutex); + return error; } /* We clear the RESEND flag because we might be taking an lkb off the waiters @@ -759,34 +845,85 @@ static void add_to_waiters(struct dlm_lkb *lkb, int mstype) request reply on the requestqueue) between dlm_recover_waiters_pre() which set RESEND and dlm_recover_waiters_post() */ -static int _remove_from_waiters(struct dlm_lkb *lkb) +static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype) { - int error = 0; + struct dlm_ls *ls = lkb->lkb_resource->res_ls; + int overlap_done = 0; - if (!lkb->lkb_wait_type) { - log_print("remove_from_waiters error"); - error = -EINVAL; - goto out; + if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) { + lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; + overlap_done = 1; + goto out_del; + } + + if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) { + lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; + overlap_done = 1; + goto out_del; + } + + /* N.B. type of reply may not always correspond to type of original + msg due to lookup->request optimization, verify others? */ + + if (lkb->lkb_wait_type) { + lkb->lkb_wait_type = 0; + goto out_del; + } + + log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d", + lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type); + return -1; + + out_del: + /* the force-unlock/cancel has completed and we haven't recvd a reply + to the op that was in progress prior to the unlock/cancel; we + give up on any reply to the earlier op. FIXME: not sure when/how + this would happen */ + + if (overlap_done && lkb->lkb_wait_type) { + log_error(ls, "remove_from_waiters %x reply %d give up on %d", + lkb->lkb_id, mstype, lkb->lkb_wait_type); + lkb->lkb_wait_count--; + lkb->lkb_wait_type = 0; } - lkb->lkb_wait_type = 0; + + DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb);); + lkb->lkb_flags &= ~DLM_IFL_RESEND; - list_del(&lkb->lkb_wait_reply); + lkb->lkb_wait_count--; + if (!lkb->lkb_wait_count) + list_del_init(&lkb->lkb_wait_reply); unhold_lkb(lkb); - out: - return error; + return 0; } -static int remove_from_waiters(struct dlm_lkb *lkb) +static int remove_from_waiters(struct dlm_lkb *lkb, int mstype) { struct dlm_ls *ls = lkb->lkb_resource->res_ls; int error; mutex_lock(&ls->ls_waiters_mutex); - error = _remove_from_waiters(lkb); + error = _remove_from_waiters(lkb, mstype); mutex_unlock(&ls->ls_waiters_mutex); return error; } +/* Handles situations where we might be processing a "fake" or "stub" reply in + which we can't try to take waiters_mutex again. */ + +static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms) +{ + struct dlm_ls *ls = lkb->lkb_resource->res_ls; + int error; + + if (ms != &ls->ls_stub_ms) + mutex_lock(&ls->ls_waiters_mutex); + error = _remove_from_waiters(lkb, ms->m_type); + if (ms != &ls->ls_stub_ms) + mutex_unlock(&ls->ls_waiters_mutex); + return error; +} + static void dir_remove(struct dlm_rsb *r) { int to_nodeid; @@ -988,8 +1125,14 @@ static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) _remove_lock(r, lkb); } -static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) +/* returns: 0 did nothing + 1 moved lock to granted + -1 removed lock */ + +static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) { + int rv = 0; + lkb->lkb_rqmode = DLM_LOCK_IV; switch (lkb->lkb_status) { @@ -997,6 +1140,7 @@ static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) break; case DLM_LKSTS_CONVERT: move_lkb(r, lkb, DLM_LKSTS_GRANTED); + rv = 1; break; case DLM_LKSTS_WAITING: del_lkb(r, lkb); @@ -1004,15 +1148,17 @@ static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) /* this unhold undoes the original ref from create_lkb() so this leads to the lkb being freed */ unhold_lkb(lkb); + rv = -1; break; default: log_print("invalid status for revert %d", lkb->lkb_status); } + return rv; } -static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) +static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) { - revert_lock(r, lkb); + return revert_lock(r, lkb); } static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) @@ -1055,6 +1201,50 @@ static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb) queue_cast(r, lkb, 0); } +/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to + change the granted/requested modes. We're munging things accordingly in + the process copy. + CONVDEADLK: our grmode may have been forced down to NL to resolve a + conversion deadlock + ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become + compatible with other granted locks */ + +static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms) +{ + if (ms->m_type != DLM_MSG_CONVERT_REPLY) { + log_print("munge_demoted %x invalid reply type %d", + lkb->lkb_id, ms->m_type); + return; + } + + if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) { + log_print("munge_demoted %x invalid modes gr %d rq %d", + lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode); + return; + } + + lkb->lkb_grmode = DLM_LOCK_NL; +} + +static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms) +{ + if (ms->m_type != DLM_MSG_REQUEST_REPLY && + ms->m_type != DLM_MSG_GRANT) { + log_print("munge_altmode %x invalid reply type %d", + lkb->lkb_id, ms->m_type); + return; + } + + if (lkb->lkb_exflags & DLM_LKF_ALTPR) + lkb->lkb_rqmode = DLM_LOCK_PR; + else if (lkb->lkb_exflags & DLM_LKF_ALTCW) + lkb->lkb_rqmode = DLM_LOCK_CW; + else { + log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags); + dlm_print_lkb(lkb); + } +} + static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head) { struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb, @@ -1499,7 +1689,7 @@ static void process_lookup_list(struct dlm_rsb *r) struct dlm_lkb *lkb, *safe; list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) { - list_del(&lkb->lkb_rsb_lookup); + list_del_init(&lkb->lkb_rsb_lookup); _request_lock(r, lkb); schedule(); } @@ -1530,7 +1720,7 @@ static void confirm_master(struct dlm_rsb *r, int error) if (!list_empty(&r->res_lookup)) { lkb = list_entry(r->res_lookup.next, struct dlm_lkb, lkb_rsb_lookup); - list_del(&lkb->lkb_rsb_lookup); + list_del_init(&lkb->lkb_rsb_lookup); r->res_first_lkid = lkb->lkb_id; _request_lock(r, lkb); } else @@ -1614,6 +1804,9 @@ static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args) DLM_LKF_FORCEUNLOCK)) return -EINVAL; + if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK) + return -EINVAL; + args->flags = flags; args->astparam = (long) astarg; return 0; @@ -1638,6 +1831,9 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, if (lkb->lkb_wait_type) goto out; + + if (is_overlap(lkb)) + goto out; } lkb->lkb_exflags = args->flags; @@ -1654,35 +1850,126 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, return rv; } +/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0 + for success */ + +/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here + because there may be a lookup in progress and it's valid to do + cancel/unlockf on it */ + static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) { + struct dlm_ls *ls = lkb->lkb_resource->res_ls; int rv = -EINVAL; - if (lkb->lkb_flags & DLM_IFL_MSTCPY) + if (lkb->lkb_flags & DLM_IFL_MSTCPY) { + log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id); + dlm_print_lkb(lkb); goto out; + } - if (args->flags & DLM_LKF_FORCEUNLOCK) - goto out_ok; + /* an lkb may still exist even though the lock is EOL'ed due to a + cancel, unlock or failed noqueue request; an app can't use these + locks; return same error as if the lkid had not been found at all */ - if (args->flags & DLM_LKF_CANCEL && - lkb->lkb_status == DLM_LKSTS_GRANTED) + if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) { + log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id); + rv = -ENOENT; goto out; + } - if (!(args->flags & DLM_LKF_CANCEL) && - lkb->lkb_status != DLM_LKSTS_GRANTED) - goto out; + /* an lkb may be waiting for an rsb lookup to complete where the + lookup was initiated by another lock */ + + if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) { + if (!list_empty(&lkb->lkb_rsb_lookup)) { + log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id); + list_del_init(&lkb->lkb_rsb_lookup); + queue_cast(lkb->lkb_resource, lkb, + args->flags & DLM_LKF_CANCEL ? + -DLM_ECANCEL : -DLM_EUNLOCK); + unhold_lkb(lkb); /* undoes create_lkb() */ + rv = -EBUSY; + goto out; + } + } + + /* cancel not allowed with another cancel/unlock in progress */ + + if (args->flags & DLM_LKF_CANCEL) { + if (lkb->lkb_exflags & DLM_LKF_CANCEL) + goto out; + + if (is_overlap(lkb)) + goto out; + + if (lkb->lkb_flags & DLM_IFL_RESEND) { + lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; + rv = -EBUSY; + goto out; + } + + switch (lkb->lkb_wait_type) { + case DLM_MSG_LOOKUP: + case DLM_MSG_REQUEST: + lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; + rv = -EBUSY; + goto out; + case DLM_MSG_UNLOCK: + case DLM_MSG_CANCEL: + goto out; + } + /* add_to_waiters() will set OVERLAP_CANCEL */ + goto out_ok; + } + + /* do we need to allow a force-unlock if there's a normal unlock + already in progress? in what conditions could the normal unlock + fail such that we'd want to send a force-unlock to be sure? */ + + if (args->flags & DLM_LKF_FORCEUNLOCK) { + if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK) + goto out; + + if (is_overlap_unlock(lkb)) + goto out; + if (lkb->lkb_flags & DLM_IFL_RESEND) { + lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; + rv = -EBUSY; + goto out; + } + + switch (lkb->lkb_wait_type) { + case DLM_MSG_LOOKUP: + case DLM_MSG_REQUEST: + lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; + rv = -EBUSY; + goto out; + case DLM_MSG_UNLOCK: + goto out; + } + /* add_to_waiters() will set OVERLAP_UNLOCK */ + goto out_ok; + } + + /* normal unlock not allowed if there's any op in progress */ rv = -EBUSY; - if (lkb->lkb_wait_type) + if (lkb->lkb_wait_type || lkb->lkb_wait_count) goto out; out_ok: - lkb->lkb_exflags = args->flags; + /* an overlapping op shouldn't blow away exflags from other op */ + lkb->lkb_exflags |= args->flags; lkb->lkb_sbflags = 0; lkb->lkb_astparam = args->astparam; - rv = 0; out: + if (rv) + log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv, + lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags, + args->flags, lkb->lkb_wait_type, + lkb->lkb_resource->res_name); return rv; } @@ -1732,9 +2019,24 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) goto out; } - if (can_be_queued(lkb)) { - if (is_demoted(lkb)) + /* is_demoted() means the can_be_granted() above set the grmode + to NL, and left us on the granted queue. This auto-demotion + (due to CONVDEADLK) might mean other locks, and/or this lock, are + now grantable. We have to try to grant other converting locks + before we try again to grant this one. */ + + if (is_demoted(lkb)) { + grant_pending_convert(r, DLM_LOCK_IV); + if (_can_be_granted(r, lkb, 1)) { + grant_lock(r, lkb); + queue_cast(r, lkb, 0); grant_pending_locks(r); + goto out; + } + /* else fall through and move to convert queue */ + } + + if (can_be_queued(lkb)) { error = -EINPROGRESS; del_lkb(r, lkb); add_lkb(r, lkb, DLM_LKSTS_CONVERT); @@ -1759,17 +2061,19 @@ static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) return -DLM_EUNLOCK; } -/* FIXME: if revert_lock() finds that the lkb is granted, we should - skip the queue_cast(ECANCEL). It indicates that the request/convert - completed (and queued a normal ast) just before the cancel; we don't - want to clobber the sb_result for the normal ast with ECANCEL. */ +/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */ static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) { - revert_lock(r, lkb); - queue_cast(r, lkb, -DLM_ECANCEL); - grant_pending_locks(r); - return -DLM_ECANCEL; + int error; + + error = revert_lock(r, lkb); + if (error) { + queue_cast(r, lkb, -DLM_ECANCEL); + grant_pending_locks(r); + return -DLM_ECANCEL; + } + return 0; } /* @@ -2035,6 +2339,8 @@ int dlm_unlock(dlm_lockspace_t *lockspace, if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL) error = 0; + if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK))) + error = 0; out_put: dlm_put_lkb(lkb); out: @@ -2065,31 +2371,14 @@ int dlm_unlock(dlm_lockspace_t *lockspace, * receive_lookup_reply send_lookup_reply */ -static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, - int to_nodeid, int mstype, - struct dlm_message **ms_ret, - struct dlm_mhandle **mh_ret) +static int _create_message(struct dlm_ls *ls, int mb_len, + int to_nodeid, int mstype, + struct dlm_message **ms_ret, + struct dlm_mhandle **mh_ret) { struct dlm_message *ms; struct dlm_mhandle *mh; char *mb; - int mb_len = sizeof(struct dlm_message); - - switch (mstype) { - case DLM_MSG_REQUEST: - case DLM_MSG_LOOKUP: - case DLM_MSG_REMOVE: - mb_len += r->res_length; - break; - case DLM_MSG_CONVERT: - case DLM_MSG_UNLOCK: - case DLM_MSG_REQUEST_REPLY: - case DLM_MSG_CONVERT_REPLY: - case DLM_MSG_GRANT: - if (lkb && lkb->lkb_lvbptr) - mb_len += r->res_ls->ls_lvblen; - break; - } /* get_buffer gives us a message handle (mh) that we need to pass into lowcomms_commit and a message buffer (mb) that we @@ -2104,7 +2393,7 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, ms = (struct dlm_message *) mb; ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR); - ms->m_header.h_lockspace = r->res_ls->ls_global_id; + ms->m_header.h_lockspace = ls->ls_global_id; ms->m_header.h_nodeid = dlm_our_nodeid(); ms->m_header.h_length = mb_len; ms->m_header.h_cmd = DLM_MSG; @@ -2116,6 +2405,33 @@ static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, return 0; } +static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, + int to_nodeid, int mstype, + struct dlm_message **ms_ret, + struct dlm_mhandle **mh_ret) +{ + int mb_len = sizeof(struct dlm_message); + + switch (mstype) { + case DLM_MSG_REQUEST: + case DLM_MSG_LOOKUP: + case DLM_MSG_REMOVE: + mb_len += r->res_length; + break; + case DLM_MSG_CONVERT: + case DLM_MSG_UNLOCK: + case DLM_MSG_REQUEST_REPLY: + case DLM_MSG_CONVERT_REPLY: + case DLM_MSG_GRANT: + if (lkb && lkb->lkb_lvbptr) + mb_len += r->res_ls->ls_lvblen; + break; + } + + return _create_message(r->res_ls, mb_len, to_nodeid, mstype, + ms_ret, mh_ret); +} + /* further lowcomms enhancements or alternate implementations may make the return value from this function useful at some point */ @@ -2176,7 +2492,9 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype) struct dlm_mhandle *mh; int to_nodeid, error; - add_to_waiters(lkb, mstype); + error = add_to_waiters(lkb, mstype); + if (error) + return error; to_nodeid = r->res_nodeid; @@ -2192,7 +2510,7 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype) return 0; fail: - remove_from_waiters(lkb); + remove_from_waiters(lkb, msg_reply_type(mstype)); return error; } @@ -2209,7 +2527,8 @@ static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) /* down conversions go without a reply from the master */ if (!error && down_conversion(lkb)) { - remove_from_waiters(lkb); + remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY); + r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; r->res_ls->ls_stub_ms.m_result = 0; r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags; __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms); @@ -2280,7 +2599,9 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb) struct dlm_mhandle *mh; int to_nodeid, error; - add_to_waiters(lkb, DLM_MSG_LOOKUP); + error = add_to_waiters(lkb, DLM_MSG_LOOKUP); + if (error) + return error; to_nodeid = dlm_dir_nodeid(r); @@ -2296,7 +2617,7 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb) return 0; fail: - remove_from_waiters(lkb); + remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY); return error; } @@ -2656,6 +2977,8 @@ static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms) lock_rsb(r); receive_flags_reply(lkb, ms); + if (is_altmode(lkb)) + munge_altmode(lkb, ms); grant_lock_pc(r, lkb, ms); queue_cast(r, lkb, 0); @@ -2736,11 +3059,16 @@ static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len); } +static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms) +{ + do_purge(ls, ms->m_nodeid, ms->m_pid); +} + static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) { struct dlm_lkb *lkb; struct dlm_rsb *r; - int error, mstype; + int error, mstype, result; error = find_lkb(ls, ms->m_remid, &lkb); if (error) { @@ -2749,20 +3077,15 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) } DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); - mstype = lkb->lkb_wait_type; - error = remove_from_waiters(lkb); - if (error) { - log_error(ls, "receive_request_reply not on waiters"); - goto out; |