diff options
Diffstat (limited to 'fs/ocfs2/dlmglue.c')
| -rw-r--r-- | fs/ocfs2/dlmglue.c | 740 |
1 files changed, 387 insertions, 353 deletions
diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c index c5e4a49e3a1..52cfe99ae05 100644 --- a/fs/ocfs2/dlmglue.c +++ b/fs/ocfs2/dlmglue.c @@ -64,7 +64,7 @@ struct ocfs2_mask_waiter { unsigned long mw_mask; unsigned long mw_goal; #ifdef CONFIG_OCFS2_FS_STATS - unsigned long long mw_lock_start; + ktime_t mw_lock_start; #endif }; @@ -297,6 +297,11 @@ static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) lockres->l_type == OCFS2_LOCK_TYPE_OPEN; } +static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb) +{ + return container_of(lksb, struct ocfs2_lock_res, l_lksb); +} + static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) { BUG_ON(!ocfs2_is_inode_lock(lockres)); @@ -392,8 +397,6 @@ static void ocfs2_build_lock_name(enum ocfs2_lock_type type, { int len; - mlog_entry_void(); - BUG_ON(type >= OCFS2_NUM_LOCK_TYPES); len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x", @@ -403,8 +406,6 @@ static void ocfs2_build_lock_name(enum ocfs2_lock_type type, BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1)); mlog(0, "built lock resource with name: %s\n", name); - - mlog_exit_void(); } static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock); @@ -430,44 +431,41 @@ static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) #ifdef CONFIG_OCFS2_FS_STATS static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) { - res->l_lock_num_prmode = 0; - res->l_lock_num_prmode_failed = 0; - res->l_lock_total_prmode = 0; - res->l_lock_max_prmode = 0; - res->l_lock_num_exmode = 0; - res->l_lock_num_exmode_failed = 0; - res->l_lock_total_exmode = 0; - res->l_lock_max_exmode = 0; res->l_lock_refresh = 0; + memset(&res->l_lock_prmode, 0, sizeof(struct ocfs2_lock_stats)); + memset(&res->l_lock_exmode, 0, sizeof(struct ocfs2_lock_stats)); } static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level, struct ocfs2_mask_waiter *mw, int ret) { - unsigned long long *num, *sum; - unsigned int *max, *failed; - struct timespec ts = current_kernel_time(); - unsigned long long time = timespec_to_ns(&ts) - mw->mw_lock_start; - - if (level == LKM_PRMODE) { - num = &res->l_lock_num_prmode; - sum = &res->l_lock_total_prmode; - max = &res->l_lock_max_prmode; - failed = &res->l_lock_num_prmode_failed; - } else if (level == LKM_EXMODE) { - num = &res->l_lock_num_exmode; - sum = &res->l_lock_total_exmode; - max = &res->l_lock_max_exmode; - failed = &res->l_lock_num_exmode_failed; - } else + u32 usec; + ktime_t kt; + struct ocfs2_lock_stats *stats; + + if (level == LKM_PRMODE) + stats = &res->l_lock_prmode; + else if (level == LKM_EXMODE) + stats = &res->l_lock_exmode; + else return; - (*num)++; - (*sum) += time; - if (time > *max) - *max = time; + kt = ktime_sub(ktime_get(), mw->mw_lock_start); + usec = ktime_to_us(kt); + + stats->ls_gets++; + stats->ls_total += ktime_to_ns(kt); + /* overflow */ + if (unlikely(stats->ls_gets == 0)) { + stats->ls_gets++; + stats->ls_total = ktime_to_ns(kt); + } + + if (stats->ls_max < usec) + stats->ls_max = usec; + if (ret) - (*failed)++; + stats->ls_fail++; } static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) @@ -477,8 +475,7 @@ static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres) static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw) { - struct timespec ts = current_kernel_time(); - mw->mw_lock_start = timespec_to_ns(&ts); + mw->mw_lock_start = ktime_get(); } #else static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res) @@ -724,8 +721,6 @@ void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres, void ocfs2_lock_res_free(struct ocfs2_lock_res *res) { - mlog_entry_void(); - if (!(res->l_flags & OCFS2_LOCK_INITIALIZED)) return; @@ -751,14 +746,11 @@ void ocfs2_lock_res_free(struct ocfs2_lock_res *res) memset(&res->l_lksb, 0, sizeof(res->l_lksb)); res->l_flags = 0UL; - mlog_exit_void(); } static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, int level) { - mlog_entry_void(); - BUG_ON(!lockres); switch(level) { @@ -771,15 +763,11 @@ static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, default: BUG(); } - - mlog_exit_void(); } static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, int level) { - mlog_entry_void(); - BUG_ON(!lockres); switch(level) { @@ -794,7 +782,6 @@ static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, default: BUG(); } - mlog_exit_void(); } /* WARNING: This function lives in a world where the only three lock @@ -841,8 +828,6 @@ static void lockres_clear_flags(struct ocfs2_lock_res *lockres, static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres) { - mlog_entry_void(); - BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); @@ -855,14 +840,10 @@ static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); } lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); - - mlog_exit_void(); } static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres) { - mlog_entry_void(); - BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); @@ -875,15 +856,19 @@ static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lo lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); lockres->l_level = lockres->l_requested; - lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); - mlog_exit_void(); + /* + * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing + * the OCFS2_LOCK_BUSY flag to prevent the dc thread from + * downconverting the lock before the upconvert has fully completed. + */ + lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); + + lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); } static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres) { - mlog_entry_void(); - BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY))); BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); @@ -895,20 +880,15 @@ static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *loc lockres->l_level = lockres->l_requested; lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED); lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); - - mlog_exit_void(); } static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level) { int needs_downconvert = 0; - mlog_entry_void(); assert_spin_locked(&lockres->l_lock); - lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); - if (level > lockres->l_blocking) { /* only schedule a downconvert if we haven't already scheduled * one that goes low enough to satisfy the level we're @@ -921,7 +901,13 @@ static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, lockres->l_blocking = level; } - mlog_exit(needs_downconvert); + mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n", + lockres->l_name, level, lockres->l_level, lockres->l_blocking, + needs_downconvert); + + if (needs_downconvert) + lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); + mlog(0, "needs_downconvert = %d\n", needs_downconvert); return needs_downconvert; } @@ -1031,18 +1017,17 @@ static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres) return lockres->l_pending_gen; } - -static void ocfs2_blocking_ast(void *opaque, int level) +static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level) { - struct ocfs2_lock_res *lockres = opaque; + struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); int needs_downconvert; unsigned long flags; BUG_ON(level <= DLM_LOCK_NL); - mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n", - lockres->l_name, level, lockres->l_level, + mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, " + "type %s\n", lockres->l_name, level, lockres->l_level, ocfs2_lock_type_string(lockres->l_type)); /* @@ -1063,9 +1048,9 @@ static void ocfs2_blocking_ast(void *opaque, int level) ocfs2_wake_downconvert_thread(osb); } -static void ocfs2_locking_ast(void *opaque) +static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb) { - struct ocfs2_lock_res *lockres = opaque; + struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); unsigned long flags; int status; @@ -1086,6 +1071,10 @@ static void ocfs2_locking_ast(void *opaque) return; } + mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, " + "level %d => %d\n", lockres->l_name, lockres->l_action, + lockres->l_unlock_action, lockres->l_level, lockres->l_requested); + switch(lockres->l_action) { case OCFS2_AST_ATTACH: ocfs2_generic_handle_attach_action(lockres); @@ -1098,8 +1087,8 @@ static void ocfs2_locking_ast(void *opaque) ocfs2_generic_handle_downconvert_action(lockres); break; default: - mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u " - "lockres flags = 0x%lx, unlock action: %u\n", + mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, " + "flags 0x%lx, unlock: %u\n", lockres->l_name, lockres->l_action, lockres->l_flags, lockres->l_unlock_action); BUG(); @@ -1125,14 +1114,91 @@ out: spin_unlock_irqrestore(&lockres->l_lock, flags); } +static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error) +{ + struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); + unsigned long flags; + + mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n", + lockres->l_name, lockres->l_unlock_action); + + spin_lock_irqsave(&lockres->l_lock, flags); + if (error) { + mlog(ML_ERROR, "Dlm passes error %d for lock %s, " + "unlock_action %d\n", error, lockres->l_name, + lockres->l_unlock_action); + spin_unlock_irqrestore(&lockres->l_lock, flags); + return; + } + + switch(lockres->l_unlock_action) { + case OCFS2_UNLOCK_CANCEL_CONVERT: + mlog(0, "Cancel convert success for %s\n", lockres->l_name); + lockres->l_action = OCFS2_AST_INVALID; + /* Downconvert thread may have requeued this lock, we + * need to wake it. */ + if (lockres->l_flags & OCFS2_LOCK_BLOCKED) + ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres)); + break; + case OCFS2_UNLOCK_DROP_LOCK: + lockres->l_level = DLM_LOCK_IV; + break; + default: + BUG(); + } + + lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); + lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; + wake_up(&lockres->l_event); + spin_unlock_irqrestore(&lockres->l_lock, flags); +} + +/* + * This is the filesystem locking protocol. It provides the lock handling + * hooks for the underlying DLM. It has a maximum version number. + * The version number allows interoperability with systems running at + * the same major number and an equal or smaller minor number. + * + * Whenever the filesystem does new things with locks (adds or removes a + * lock, orders them differently, does different things underneath a lock), + * the version must be changed. The protocol is negotiated when joining + * the dlm domain. A node may join the domain if its major version is + * identical to all other nodes and its minor version is greater than + * or equal to all other nodes. When its minor version is greater than + * the other nodes, it will run at the minor version specified by the + * other nodes. + * + * If a locking change is made that will not be compatible with older + * versions, the major number must be increased and the minor version set + * to zero. If a change merely adds a behavior that can be disabled when + * speaking to older versions, the minor version must be increased. If a + * change adds a fully backwards compatible change (eg, LVB changes that + * are just ignored by older versions), the version does not need to be + * updated. + */ +static struct ocfs2_locking_protocol lproto = { + .lp_max_version = { + .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, + .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, + }, + .lp_lock_ast = ocfs2_locking_ast, + .lp_blocking_ast = ocfs2_blocking_ast, + .lp_unlock_ast = ocfs2_unlock_ast, +}; + +void ocfs2_set_locking_protocol(void) +{ + ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version); +} + static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, int convert) { unsigned long flags; - mlog_entry_void(); spin_lock_irqsave(&lockres->l_lock, flags); lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); + lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); if (convert) lockres->l_action = OCFS2_AST_INVALID; else @@ -1140,7 +1206,6 @@ static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, spin_unlock_irqrestore(&lockres->l_lock, flags); wake_up(&lockres->l_event); - mlog_exit_void(); } /* Note: If we detect another process working on the lock (i.e., @@ -1156,8 +1221,6 @@ static int ocfs2_lock_create(struct ocfs2_super *osb, unsigned long flags; unsigned int gen; - mlog_entry_void(); - mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level, dlm_flags); @@ -1179,8 +1242,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb, &lockres->l_lksb, dlm_flags, lockres->l_name, - OCFS2_LOCK_ID_MAX_LEN - 1, - lockres); + OCFS2_LOCK_ID_MAX_LEN - 1); lockres_clear_pending(lockres, gen, osb); if (ret) { ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); @@ -1190,7 +1252,6 @@ static int ocfs2_lock_create(struct ocfs2_super *osb, mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name); bail: - mlog_exit(ret); return ret; } @@ -1243,7 +1304,7 @@ static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) { wait_for_completion(&mw->mw_complete); /* Re-arm the completion in case we want to wait on it again */ - INIT_COMPLETION(mw->mw_complete); + reinit_completion(&mw->mw_complete); return mw->mw_status; } @@ -1294,7 +1355,7 @@ static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw, else ret = mw->mw_status; /* Re-arm the completion in case we want to wait on it again */ - INIT_COMPLETION(mw->mw_complete); + reinit_completion(&mw->mw_complete); return ret; } @@ -1313,8 +1374,6 @@ static int __ocfs2_cluster_lock(struct ocfs2_super *osb, unsigned int gen; int noqueue_attempted = 0; - mlog_entry_void(); - ocfs2_init_mask_waiter(&mw); if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) @@ -1323,13 +1382,13 @@ static int __ocfs2_cluster_lock(struct ocfs2_super *osb, again: wait = 0; + spin_lock_irqsave(&lockres->l_lock, flags); + if (catch_signals && signal_pending(current)) { ret = -ERESTARTSYS; - goto out; + goto unlock; } - spin_lock_irqsave(&lockres->l_lock, flags); - mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, "Cluster lock called on freeing lockres %s! flags " "0x%lx\n", lockres->l_name, lockres->l_flags); @@ -1346,6 +1405,25 @@ again: goto unlock; } + if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) { + /* + * We've upconverted. If the lock now has a level we can + * work with, we take it. If, however, the lock is not at the + * required level, we go thru the full cycle. One way this could + * happen is if a process requesting an upconvert to PR is + * closely followed by another requesting upconvert to an EX. + * If the process requesting EX lands here, we want it to + * continue attempting to upconvert and let the process + * requesting PR take the lock. + * If multiple processes request upconvert to PR, the first one + * here will take the lock. The others will have to go thru the + * OCFS2_LOCK_BLOCKED check to ensure that there is no pending + * downconvert request. + */ + if (level <= lockres->l_level) + goto update_holders; + } + if (lockres->l_flags & OCFS2_LOCK_BLOCKED && !ocfs2_may_continue_on_blocked_lock(lockres, level)) { /* is the lock is currently blocked on behalf of @@ -1383,7 +1461,7 @@ again: BUG_ON(level == DLM_LOCK_IV); BUG_ON(level == DLM_LOCK_NL); - mlog(0, "lock %s, convert from %d to level = %d\n", + mlog(ML_BASTS, "lockres %s, convert from %d to %d\n", lockres->l_name, lockres->l_level, level); /* call dlm_lock to upgrade lock now */ @@ -1392,8 +1470,7 @@ again: &lockres->l_lksb, lkm_flags, lockres->l_name, - OCFS2_LOCK_ID_MAX_LEN - 1, - lockres); + OCFS2_LOCK_ID_MAX_LEN - 1); lockres_clear_pending(lockres, gen, osb); if (ret) { if (!(lkm_flags & DLM_LKF_NOQUEUE) || @@ -1416,11 +1493,14 @@ again: goto again; } +update_holders: /* Ok, if we get here then we're good to go. */ ocfs2_inc_holders(lockres, level); ret = 0; unlock: + lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING); + spin_unlock_irqrestore(&lockres->l_lock, flags); out: /* @@ -1459,7 +1539,6 @@ out: caller_ip); } #endif - mlog_exit(ret); return ret; } @@ -1481,7 +1560,6 @@ static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, { unsigned long flags; - mlog_entry_void(); spin_lock_irqsave(&lockres->l_lock, flags); ocfs2_dec_holders(lockres, level); ocfs2_downconvert_on_unlock(osb, lockres); @@ -1490,7 +1568,6 @@ static void __ocfs2_cluster_unlock(struct ocfs2_super *osb, if (lockres->l_lockdep_map.key != NULL) rwsem_release(&lockres->l_lockdep_map, 1, caller_ip); #endif - mlog_exit_void(); } static int ocfs2_create_new_lock(struct ocfs2_super *osb, @@ -1524,8 +1601,6 @@ int ocfs2_create_new_inode_locks(struct inode *inode) BUG_ON(!inode); BUG_ON(!ocfs2_inode_is_new(inode)); - mlog_entry_void(); - mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); /* NOTE: That we don't increment any of the holder counts, nor @@ -1559,7 +1634,6 @@ int ocfs2_create_new_inode_locks(struct inode *inode) } bail: - mlog_exit(ret); return ret; } @@ -1571,16 +1645,12 @@ int ocfs2_rw_lock(struct inode *inode, int write) BUG_ON(!inode); - mlog_entry_void(); - mlog(0, "inode %llu take %s RW lock\n", (unsigned long long)OCFS2_I(inode)->ip_blkno, write ? "EXMODE" : "PRMODE"); - if (ocfs2_mount_local(osb)) { - mlog_exit(0); + if (ocfs2_mount_local(osb)) return 0; - } lockres = &OCFS2_I(inode)->ip_rw_lockres; @@ -1591,7 +1661,6 @@ int ocfs2_rw_lock(struct inode *inode, int write) if (status < 0) mlog_errno(status); - mlog_exit(status); return status; } @@ -1601,16 +1670,12 @@ void ocfs2_rw_unlock(struct inode *inode, int write) struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); - mlog_entry_void(); - mlog(0, "inode %llu drop %s RW lock\n", (unsigned long long)OCFS2_I(inode)->ip_blkno, write ? "EXMODE" : "PRMODE"); if (!ocfs2_mount_local(osb)) ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); - - mlog_exit_void(); } /* @@ -1624,12 +1689,10 @@ int ocfs2_open_lock(struct inode *inode) BUG_ON(!inode); - mlog_entry_void(); - mlog(0, "inode %llu take PRMODE open lock\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); - if (ocfs2_mount_local(osb)) + if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb)) goto out; lockres = &OCFS2_I(inode)->ip_open_lockres; @@ -1640,7 +1703,6 @@ int ocfs2_open_lock(struct inode *inode) mlog_errno(status); out: - mlog_exit(status); return status; } @@ -1652,12 +1714,16 @@ int ocfs2_try_open_lock(struct inode *inode, int write) BUG_ON(!inode); - mlog_entry_void(); - mlog(0, "inode %llu try to take %s open lock\n", (unsigned long long)OCFS2_I(inode)->ip_blkno, write ? "EXMODE" : "PRMODE"); + if (ocfs2_is_hard_readonly(osb)) { + if (write) + status = -EROFS; + goto out; + } + if (ocfs2_mount_local(osb)) goto out; @@ -1675,7 +1741,6 @@ int ocfs2_try_open_lock(struct inode *inode, int write) level, DLM_LKF_NOQUEUE, 0); out: - mlog_exit(status); return status; } @@ -1687,8 +1752,6 @@ void ocfs2_open_unlock(struct inode *inode) struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres; struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); - mlog_entry_void(); - mlog(0, "inode %llu drop open lock\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); @@ -1703,7 +1766,7 @@ void ocfs2_open_unlock(struct inode *inode) DLM_LOCK_EX); out: - mlog_exit_void(); + return; } static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres, @@ -1757,7 +1820,7 @@ out: * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of * flock() calls. The locking approach this requires is sufficiently * different from all other cluster lock types that we implement a - * seperate path to the "low-level" dlm calls. In particular: + * separate path to the "low-level" dlm calls. In particular: * * - No optimization of lock levels is done - we take at exactly * what's been requested. @@ -1827,8 +1890,7 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock) spin_unlock_irqrestore(&lockres->l_lock, flags); ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags, - lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1, - lockres); + lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1); if (ret) { if (!trylock || (ret != -EAGAIN)) { ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); @@ -1920,8 +1982,6 @@ static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, { int kick = 0; - mlog_entry_void(); - /* If we know that another node is waiting on our lock, kick * the downconvert thread * pre-emptively when we reach a release * condition. */ @@ -1942,8 +2002,6 @@ static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, if (kick) ocfs2_wake_downconvert_thread(osb); - - mlog_exit_void(); } #define OCFS2_SEC_BITS 34 @@ -1972,8 +2030,6 @@ static void __ocfs2_stuff_meta_lvb(struct inode *inode) struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; struct ocfs2_meta_lvb *lvb; - mlog_entry_void(); - lvb = ocfs2_dlm_lvb(&lockres->l_lksb); /* @@ -1989,8 +2045,8 @@ static void __ocfs2_stuff_meta_lvb(struct inode *inode) lvb->lvb_version = OCFS2_LVB_VERSION; lvb->lvb_isize = cpu_to_be64(i_size_read(inode)); lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters); - lvb->lvb_iuid = cpu_to_be32(inode->i_uid); - lvb->lvb_igid = cpu_to_be32(inode->i_gid); + lvb->lvb_iuid = cpu_to_be32(i_uid_read(inode)); + lvb->lvb_igid = cpu_to_be32(i_gid_read(inode)); lvb->lvb_imode = cpu_to_be16(inode->i_mode); lvb->lvb_inlink = cpu_to_be16(inode->i_nlink); lvb->lvb_iatime_packed = @@ -2005,8 +2061,6 @@ static void __ocfs2_stuff_meta_lvb(struct inode *inode) out: mlog_meta_lvb(0, lockres); - - mlog_exit_void(); } static void ocfs2_unpack_timespec(struct timespec *spec, @@ -2022,8 +2076,6 @@ static void ocfs2_refresh_inode_from_lvb(struct inode *inode) struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; struct ocfs2_meta_lvb *lvb; - mlog_entry_void(); - mlog_meta_lvb(0, lockres); lvb = ocfs2_dlm_lvb(&lockres->l_lksb); @@ -2043,10 +2095,10 @@ static void ocfs2_refresh_inode_from_lvb(struct inode *inode) else inode->i_blocks = ocfs2_inode_sector_count(inode); - inode->i_uid = be32_to_cpu(lvb->lvb_iuid); - inode->i_gid = be32_to_cpu(lvb->lvb_igid); + i_uid_write(inode, be32_to_cpu(lvb->lvb_iuid)); + i_gid_write(inode, be32_to_cpu(lvb->lvb_igid)); inode->i_mode = be16_to_cpu(lvb->lvb_imode); - inode->i_nlink = be16_to_cpu(lvb->lvb_inlink); + set_nlink(inode, be16_to_cpu(lvb->lvb_inlink)); ocfs2_unpack_timespec(&inode->i_atime, be64_to_cpu(lvb->lvb_iatime_packed)); ocfs2_unpack_timespec(&inode->i_mtime, @@ -2054,8 +2106,6 @@ static void ocfs2_refresh_inode_from_lvb(struct inode *inode) ocfs2_unpack_timespec(&inode->i_ctime, be64_to_cpu(lvb->lvb_ictime_packed)); spin_unlock(&oi->ip_lock); - - mlog_exit_void(); } static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, @@ -2082,8 +2132,6 @@ static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres) unsigned long flags; int status = 0; - mlog_entry_void(); - refresh_check: spin_lock_irqsave(&lockres->l_lock, flags); if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { @@ -2104,7 +2152,7 @@ refresh_check: status = 1; bail: - mlog_exit(status); + mlog(0, "status %d\n", status); return status; } @@ -2114,7 +2162,6 @@ static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockre int status) { unsigned long flags; - mlog_entry_void(); spin_lock_irqsave(&lockres->l_lock, flags); lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING); @@ -2123,8 +2170,6 @@ static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockre spin_unlock_irqrestore(&lockres->l_lock, flags); wake_up(&lockres->l_event); - - mlog_exit_void(); } /* may or may not return a bh if it went to disk. */ @@ -2137,8 +2182,6 @@ static int ocfs2_inode_lock_update(struct inode *inode, struct ocfs2_dinode *fe; struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); - mlog_entry_void(); - if (ocfs2_mount_local(osb)) goto bail; @@ -2207,7 +2250,6 @@ static int ocfs2_inode_lock_update(struct inode *inode, bail_refresh: ocfs2_complete_lock_res_refresh(lockres, status); bail: - mlog_exit(status); return status; } @@ -2251,8 +2293,6 @@ int ocfs2_inode_lock_full_nested(struct inode *inode, BUG_ON(!inode); - mlog_entry_void(); - mlog(0, "inode %llu, take %s META lock\n", (unsigned long long)OCFS2_I(inode)->ip_blkno, ex ? "EXMODE" : "PRMODE"); @@ -2264,7 +2304,7 @@ int ocfs2_inode_lock_full_nested(struct inode *inode, if (ocfs2_is_hard_readonly(osb)) { if (ex) status = -EROFS; - goto bail; + goto getbh; } if (ocfs2_mount_local(osb)) @@ -2282,7 +2322,7 @@ int ocfs2_inode_lock_full_nested(struct inode *inode, status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags, subclass, _RET_IP_); if (status < 0) { - if (status != -EAGAIN && status != -EIOCBRETRY) + if (status != -EAGAIN) mlog_errno(status); goto bail; } @@ -2322,7 +2362,7 @@ local: mlog_errno(status); goto bail; } - +getbh: if (ret_bh) { status = ocfs2_assign_bh(inode, ret_bh, local_bh); if (status < 0) { @@ -2344,7 +2384,6 @@ bail: if (local_bh) brelse(local_bh); - mlog_exit(status); return status; } @@ -2394,7 +2433,6 @@ int ocfs2_inode_lock_atime(struct inode *inode, { int ret; - mlog_entry_void(); ret = ocfs2_inode_lock(inode, NULL, 0); if (ret < 0) { mlog_errno(ret); @@ -2422,7 +2460,6 @@ int ocfs2_inode_lock_atime(struct inode *inode, } else *level = 0; - mlog_exit(ret); return ret; } @@ -2433,8 +2470,6 @@ void ocfs2_inode_unlock(struct inode *inode, struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres; struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); - mlog_entry_void(); - mlog(0, "inode %llu drop %s META lock\n", (unsigned long long)OCFS2_I(inode)->ip_blkno, ex ? "EXMODE" : "PRMODE"); @@ -2442,8 +2477,6 @@ void ocfs2_inode_unlock(struct inode *inode, if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) && !ocfs2_mount_local(osb)) ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); - - mlog_exit_void(); } int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno) @@ -2494,8 +2527,6 @@ int ocfs2_super_lock(struct ocfs2_super *osb, int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; - mlog_entry_void(); - if (ocfs2_is_hard_readonly(osb)) return -EROFS; @@ -2513,21 +2544,18 @@ int ocfs2_super_lock(struct ocfs2_super *osb, * refreshed, so we do it here. Of course, making sense of * everything is up to the caller :) */ status = ocfs2_should_refresh_lock_res(lockres); - if (status < 0) { - mlog_errno(status); - goto bail; - } if (status) { status = ocfs2_refresh_slot_info(osb); ocfs2_complete_lock_res_refresh(lockres, status); - if (status < 0) + if (status < 0) { + ocfs2_cluster_unlock(osb, lockres, level); mlog_errno(status); + } ocfs2_track_lock_refresh(lockres); } bail: - mlog_exit(status); return status; } @@ -2604,8 +2632,11 @@ int ocfs2_dentry_lock(struct dentry *dentry, int ex) BUG_ON(!dl); - if (ocfs2_is_hard_readonly(osb)) - return -EROFS; + if (ocfs2_is_hard_readonly(osb)) { + if (ex) + return -EROFS; + return 0; + } if (ocfs2_mount_local(osb)) return 0; @@ -2623,7 +2654,7 @@ void ocfs2_dentry_unlock(struct dentry *dentry, int ex) struct ocfs2_dentry_lock *dl = dentry->d_fsdata; struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); - if (!ocfs2_mount_local(osb)) + if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); } @@ -2746,8 +2777,15 @@ static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) return iter; } -/* So that debugfs.ocfs2 can determine which format is being used */ -#define OCFS2_DLM_DEBUG_STR_VERSION 2 +/* + * Version is used by debugfs.ocfs2 to determine the format being used + * + * New in version 2 + * - Lock stats printed + * New in version 3 + * - Max time in lock stats is in usecs (instead of nsecs) + */ +#define OCFS2_DLM_DEBUG_STR_VERSION 3 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) { int i; @@ -2789,18 +2827,18 @@ static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) seq_printf(m, "0x%x\t", lvb[i]); #ifdef CONFIG_OCFS2_FS_STATS -# define lock_num_prmode(_l) (_l)->l_lock_num_prmode -# define lock_num_exmode(_l) (_l)->l_lock_num_exmode -# define lock_num_prmode_failed(_l) (_l)->l_lock_num_prmode_failed -# define lock_num_exmode_failed(_l) (_l)->l_lock_num_exmode_failed -# define lock_total_prmode(_l) (_l)->l_lock_total_prmode -# define lock_total_exmode(_l) (_l)->l_lock_total_exmode -# define lock_max_prmode(_l) (_l)->l_lock_max_prmode -# define lock_max_exmode(_l) (_l)->l_lock_max_exmode -# define lock_refresh(_l) (_l)->l_lock_refresh +# define lock_num_prmode(_l) ((_l)->l_lock_prmode.ls_gets) +# define lock_num_exmode(_l) ((_l)->l_lock_exmode.ls_gets) +# define lock_num_prmode_failed(_l) ((_l)->l_lock_prmode.ls_fail) +# define lock_num_exmode_failed(_l) ((_l)->l_lock_exmode.ls_fail) +# define lock_total_prmode(_l) ((_l)->l_lock_prmode.ls_total) +# define lock_total_exmode(_l) ((_l)->l_lock_exmode.ls_total) +# define lock_max_prmode(_l) ((_l)->l_lock_prmode.ls_max) +# define lock_max_exmode(_l) ((_l)->l_lock_exmode.ls_max) +# define lock_refresh(_l) ((_l)->l_lock_refresh) #else -# define lock_num_prmode(_l) (0ULL) -# define lock_num_exmode(_l) (0ULL) +# define lock_num_prmode(_l) (0) +# define lock_num_exmode(_l) (0) # define lock_num_prmode_failed(_l) (0) # define lock_num_exmode_failed(_l) (0) # define lock_total_prmode(_l) (0ULL) @@ -2810,8 +2848,8 @@ static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) # define lock_refresh(_l) (0) #endif /* The following seq_print was added in version 2 of this output */ - seq_printf(m, "%llu\t" - "%llu\t" + seq_printf(m, "%u\t" + "%u\t" "%u\t" "%u\t" "%llu\t" @@ -2843,7 +2881,7 @@ static const struct seq_operations ocfs2_dlm_seq_ops = { static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file) { - struct seq_file *seq = (struct seq_file *) file->private_data; + struct seq_file *seq = file->private_data; struct ocfs2_dlm_seq_priv *priv = seq->private; struct ocfs2_lock_res *res = &priv->p_iter_res; @@ -2877,7 +2915,7 @@ static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file) goto out; } - seq = (struct seq_file *) file->private_data; + seq = file->private_data; seq->private = priv; ocfs2_add_lockres_tracking(&priv->p_iter_res, @@ -2931,8 +2969,6 @@ int ocfs2_dlm_init(struct ocfs2_super *osb) int status = 0; struct ocfs2_cluster_connection *conn = NULL; - mlog_entry_void(); - if (ocfs2_mount_local(osb)) { osb->node_num = 0; goto local; @@ -2955,16 +2991,18 @@ int ocfs2_dlm_init(struct ocfs2_super *osb) /* for now, uuid == domain */ status = ocfs2_cluster_connect(osb->osb_cluster_stack, + osb->osb_cluster_name, + strlen(osb->osb_cluster_name), osb->uuid_str, strlen(osb->uuid_str), - ocfs2_do_node_down, osb, + &lproto, ocfs2_do_node_down, osb, &conn); if (status) { mlog_errno(status); goto bail; } - status = ocfs2_cluster_this_node(&osb->node_num); + status = ocfs2_cluster_this_node(conn, &osb->node_num); if (status < 0) { mlog_errno(status); mlog(ML_ERROR, @@ -2989,15 +3027,12 @@ bail: kthread_stop(osb->dc_task); } - mlog_exit(status); return status; } void ocfs2_dlm_shutdown(struct ocfs2_super *osb, int hangup_pending) { - mlog_entry_void(); - ocfs2_drop_osb_locks(osb); /* @@ -3020,52 +3055,6 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb, osb->cconn = NULL; ocfs2_dlm_shutdown_debug(osb); - - mlog_exit_void(); -} - -static void ocfs2_unlock_ast(void *opaque, int error) -{ - struct ocfs2_lock_res *lockres = opaque; - unsigned long flags; - - mlog_entry_void(); - - mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name, - lockres->l_unlock_action); - - spin_lock_irqsave(&lockres->l_lock, flags); - if (error) { - mlog(ML_ERROR, "Dlm passes error %d for lock %s, " - "unlock_action %d\n", error, lockres->l_name, - lockres->l_unlock_action); - spin_unlock_irqrestore(&lockres->l_lock, flags); - mlog_exit_void(); - return; - } - - switch(lockres->l_unlock_action) { - case OCFS2_UNLOCK_CANCEL_CONVERT: - mlog(0, "Cancel convert success for %s\n", lockres->l_name); - lockres->l_action = OCFS2_AST_INVALID; - /* Downconvert thread may have requeued this lock, we - * need to wake it. */ - if (lockres->l_flags & OCFS2_LOCK_BLOCKED) - ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres)); - break; - case OCFS2_UNLOCK_DROP_LOCK: - lockres->l_level = DLM_LOCK_IV; - break; - default: - BUG(); - } - - lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); - lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; - wake_up(&lockres->l_event); - spin_unlock_irqrestore(&lockres->l_lock, flags); - - mlog_exit_void(); } static int ocfs2_drop_lock(struct ocfs2_super *osb, @@ -3135,8 +3124,7 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb, mlog(0, "lock %s\n", lockres->l_name); - ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags, - lockres); + ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags); if (ret) { ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); @@ -3148,26 +3136,63 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb, ocfs2_wait_on_busy_lock(lockres); out: - mlog_exit(0); return 0; } +static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, + struct ocfs2_lock_res *lockres); + /* Mark the lockres as being dropped. It will no longer be * queued if blocking, but we still may have to wait on it * being dequeued from the downconvert thread before we can consider - * it safe to drop. + * it safe to drop. * * You can *not* attempt to call cluster_lock on this lockres anymore. */ -void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres) +void ocfs2_mark_lockres_freeing(struct ocfs2_super *osb, + struct ocfs2_lock_res *lockres) { int status; struct ocfs2_mask_waiter mw; - unsigned long flags; + unsigned long flags, flags2; ocfs2_init_mask_waiter(&mw); spin_lock_irqsave(&lockres->l_lock, flags); lockres->l_flags |= OCFS2_LOCK_FREEING; + if (lockres->l_flags & OCFS2_LOCK_QUEUED && current == osb->dc_task) { + /* + * We know the downconvert is queued but not in progress + * because we are the downconvert thread and processing + * different lock. So we can just remove the lock from the + * queue. This is not only an optimization but also a way + * to avoid the following deadlock: + * ocfs2_dentry_post_unlock() + * ocfs2_dentry_lock_put() + * ocfs2_drop_dentry_lock() + * iput() + * ocfs2_evict_inode() + * ocfs2_clear_inode() + * ocfs2_mark_lockres_freeing() + * ... blocks waiting for OCFS2_LOCK_QUEUED + * since we are the downconvert thread which + * should clear the flag. + */ + spin_unlock_irqrestore(&lockres->l_lock, flags); + spin_lock_irqsave(&osb->dc_task_lock, flags2); + list_del_init(&lockres->l_blocked_list); + osb->blocked_lock_count--; + spin_unlock_irqrestore(&osb->dc_task_lock, flags2); + /* + * Warn if we recurse into another post_unlock call. Strictly + * speaking it isn't a problem but we need to be careful if + * that happens (stack overflow, deadlocks, ...) so warn if + * ocfs2 grows a path for which this can happen. + */ + WARN_ON_ONCE(lockres->l_ops->post_unlock); + /* Since the lock is freeing we don't do much in the fn below */ + ocfs2_process_blocked_lock(osb, lockres); + return; + } while (lockres->l_flags & OCFS2_LOCK_QUEUED) { lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0); spin_unlock_irqrestore(&lockres->l_lock, flags); @@ -3188,7 +3213,7 @@ void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, { int ret; - ocfs2_mark_lockres_freeing(lockres); + ocfs2_mark_lockres_freeing(osb, lockres); ret = ocfs2_drop_lock(osb, lockres); if (ret) mlog_errno(ret); @@ -3206,8 +3231,6 @@ int ocfs2_drop_inode_locks(struct inode *inode) { int status, err; - mlog_entry_void(); - /* No need to call ocfs2_mark_lockres_freeing here - * ocfs2_clear_inode has done it for us. */ @@ -3232,7 +3255,6 @@ int ocfs2_drop_inode_locks(struct inode *inode) if (err < 0 && !status) status = err; - mlog_exit(status); return status; } @@ -3244,13 +3266,20 @@ static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); if (lockres->l_level <= new_level) { - mlog(ML_ERROR, "lockres->l_level (%d) <= new_level (%d)\n", - lockres->l_level, new_level); + mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, " + "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, " + "block %d, pgen %d\n", lockres->l_name, lockres->l_level, + new_level, list_empty(&lockres->l_blocked_list), + list_empty(&lockres->l_mask_waiters), lockres->l_type, + lockres->l_flags, lockres->l_ro_holders, + lockres->l_ex_holders, lockres->l_action, + lockres->l_unlock_action, lockres->l_requested, + lockres->l_blocking, lockres->l_pending_gen); BUG(); } - mlog(0, "lock %s, new_level = %d, l_blocking = %d\n", - lockres->l_name, new_level, lockres->l_blocking); + mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n", + lockres->l_name, lockres->l_level, new_level, lockres->l_blocking); lockres->l_action = OCFS2_AST_DOWNCONVERT; lockres->l_requested = new_level; @@ -3267,7 +3296,8 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb, int ret; u32 dlm_flags = DLM_LKF_CONVERT; - mlog_entry_void(); + mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name, + lockres->l_level, new_level); if (lvb) dlm_flags |= DLM_LKF_VALBLK; @@ -3277,8 +3307,7 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb, &lockres->l_lksb, dlm_flags, lockres->l_name, - OCFS2_LOCK_ID_MAX_LEN - 1, - lockres); + OCFS2_LOCK_ID_MAX_LEN - 1); lockres_clear_pending(lockres, generation, osb); if (ret) { ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); @@ -3288,7 +3317,6 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb, ret = 0; bail: - mlog_exit(ret); return ret; } @@ -3298,15 +3326,11 @@ static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, { assert_spin_locked(&lockres->l_lock); - mlog_entry_void(); - mlog(0, "lock %s\n", lockres->l_name); - if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { /* If we're already trying to cancel a lock conversion * then just drop the spinlock and allow the caller to * requeue this lock. */ - - mlog(0, "Lockres %s, skip convert\n", lockres->l_name); + mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name); return 0; } @@ -3321,6 +3345,8 @@ static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, "lock %s, invalid flags: 0x%lx\n", lockres->l_name, lockres->l_flags); + mlog(ML_BASTS, "lockres %s\n", lockres->l_name); + return 1; } @@ -3329,19 +3355,15 @@ static int ocfs2_cancel_convert(struct ocfs2_super *osb, { int ret; - mlog_entry_void(); - mlog(0, "lock %s\n", lockres->l_name); - ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, - DLM_LKF_CANCEL, lockres); + DLM_LKF_CANCEL); if (ret) { ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); ocfs2_recover_from_dlm_error(lockres, 0); } - mlog(0, "lock %s return from ocfs2_dlm_unlock\n", lockres->l_name); + mlog(ML_BASTS, "lockres %s\n", lockres->l_name); - mlog_exit(ret); return ret; } @@ -3352,17 +3374,24 @@ static int ocfs2_unblock_lock(struct ocfs2_super *osb, unsigned long flags; int blocking; int new_level; + int level; int ret = 0; int set_lvb = 0; unsigned int gen; - mlog_entry_void(); - spin_lock_irqsave(&lockres->l_lock, flags); - BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); - recheck: + /* + * Is it still blocking? If not, we have no more work to do. + */ + if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) { + BUG_ON(lockres->l_blocking != DLM_LOCK_NL); + spin_unlock_irqrestore(&lockres->l_lock, flags); + ret = 0; + goto leave; + } + if (lockres->l_flags & OCFS2_LOCK_BUSY) { /* XXX * This is a *big* race. The OCFS2_LOCK_PENDING flag @@ -3387,8 +3416,11 @@ recheck: * at the same time they set OCFS2_DLM_BUSY. They must * clear OCFS2_DLM_PENDING after dlm_lock() returns. */ - if (lockres->l_flags & OCFS2_LOCK_PENDING) + if (lockres->l_flags & OCFS2_LOCK_PENDING) { + mlog(ML_BASTS, "lockres %s, ReQ: Pending\n", + lockres->l_name); goto leave_requeue; + } ctl->requeue = 1; ret = ocfs2_prepare_cancel_convert(osb, lockres); @@ -3401,31 +3433,70 @@ recheck: goto leave; } + /* + * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is + * set when the ast is received for an upconvert just before the + * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast + * on the heels of the ast, we want to delay the downconvert just + * enough to allow the up requestor to do its task. Because this + * lock is in the blocked queue, the lock will be downconverted + * as soon as the requestor is done with the lock. + */ + if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) + goto leave_requeue; + + /* + * How can we block and yet be at NL? We were trying to upconvert + * from NL and got canceled. The code comes back here, and now + * we notice and clear BLOCKING. + */ + if (lockres->l_level == DLM_LOCK_NL) { + BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders); + mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name); + lockres->l_blocking = DLM_LOCK_NL; + lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); + spin_unlock_irqrestore(&lockres->l_lock, flags); + goto leave; + } + /* if we're blocking an exclusive and we have *any* holders, * then requeue. */ if ((lockres->l_blocking == DLM_LOCK_EX) - && (lockres->l_ex_holders || lockres->l_ro_holders)) + && (lockres->l_ex_holders || lockres->l_ro_holders)) { + mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n", + lockres->l_name, lockres->l_ex_holders, + lockres->l_ro_holders); goto leave_requeue; + } /* If it's a PR we're blocking, then only * requeue if we've got any EX holders */ if (lockres->l_blocking == DLM_LOCK_PR && - lockres->l_ex_holders) + lockres->l_ex_holders) { + mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n", + lockres->l_name, lockres->l_ex_holders); goto leave_requeue; + } /* * Can we get a lock in this state if the holder counts are * zero? The meta data unblock code used to check this. */ if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) - && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) + && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) { + mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n", + lockres->l_name); goto leave_requeue; + } new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); if (lockres->l_ops->check_downconvert - && !lockres->l_ops->check_downconvert(lockres, new_level)) + && !lockres->l_ops->check_downconvert(lockres, new_level)) { + mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n", + lockres->l_name); goto leave_requeue; + } /* If we get here, then we know that there are no more * incompatible holders (and anyone asking for an incompatible @@ -3438,17 +3509,24 @@ recheck: * may sleep, so we save off a copy of what we're blocking as * it may change while we're not holding the spin lock. */ blocking = lockres->l_blocking; + level = lockres->l_level; spin_unlock_irqrestore(&lockres->l_lock, flags); ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); - if (ctl->unblock_action == UNBLOCK_STOP_POST) + if (ctl->unblock_action == UNBLOCK_STOP_POST) { + mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n", + lockres->l_name); goto leave; + } spin_lock_irqsave(&lockres->l_lock, flags); - if (blocking != lockres->l_blocking) { + if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) { /* If this changed underneath us, then we can't drop * it just yet. */ + mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, " + "Recheck\n", lockres->l_name, blocking, + lockres->l_blocking, level, lockres->l_level); goto recheck; } @@ -3475,14 +3553,14 @@ downconvert: gen); leave: - mlog_exit(ret); + if (ret) + mlog_errno(ret); return ret; leave_requeue: spin_unlock_irqrestore(&lockres->l_lock, flags); ctl->requeue = 1; - mlog_exit(0); return 0; } @@ -3491,10 +3569,18 @@ static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, { struct inode *inode; struct address_space *mapping; + struct ocfs2_inode_info *oi; inode = ocfs2_lock_res_inode(lockres); mapping = inode->i_mapping; + if (S_ISDIR(inode->i_mode)) { + oi = OCFS2_I(inode); + oi->ip_dir_lock_gen++; + mlog(0, "generation: %u\n", oi->ip_dir_lock_gen); + goto out; + } + if (!S_ISREG(inode->i_mode)) goto out; @@ -3707,8 +3793,6 @@ static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres) struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb, oinfo->dqi_gi.dqi_type); - mlog_entry_void(); - lvb = ocfs2_dlm_lvb(&lockres->l_lksb); lvb->lvb_version = OCFS2_QINFO_LVB_VERSION; lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace); @@ -3717,8 +3801,6 @@ static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres) lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks); lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk); lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry); - - mlog_exit_void(); } void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex) @@ -3727,10 +3809,8 @@ void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex) struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb); int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; - mlog_entry_void(); if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) ocfs2_cluster_unlock(osb, lockres, level); - mlog_exit_void(); } static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo) @@ -3753,7 +3833,8 @@ static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo) oinfo->dqi_gi.dqi_free_entry = be32_to_cpu(lvb->lvb_free_entry); } else { - status = ocfs2_read_quota_block(oinfo->dqi_gqinode, 0, &bh); + status = ocfs2_read_quota_phys_block(oinfo->dqi_gqinode, + oinfo->dqi_giblk, &bh); if (status) { mlog_errno(status); goto bail; @@ -3784,8 +3865,6 @@ int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex) int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; int status = 0; - mlog_entry_void(); - /* On RO devices, locking really isn't needed... */ if (ocfs2_is_hard_readonly(osb)) { if (ex) @@ -3808,7 +3887,6 @@ int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex) ocfs2_qinfo_unlock(oinfo, ex); ocfs2_complete_lock_res_refresh(lockres, status); bail: - mlog_exit(status); return status; } @@ -3843,45 +3921,6 @@ void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex) ocfs2_cluster_unlock(osb, lockres, level); } -/* - * This is the filesystem locking protocol. It provides the lock handling - * hooks for the underlying DLM. It has a maximum version number. - * The version number allows interoperability with systems running at - * the same major number and an equal or smaller minor number. - * - * Whenever the filesystem does new things with locks (adds or removes a - * lock, orders them differently, does different things underneath a lock), - * the version must be changed. The protocol is negotiated when joining - * the dlm domain. A node may join the domain if its major version is - * identical to all other nodes and its minor version is greater than - * or equal to all other nodes. When its minor version is greater than - * the other nodes, it will run at the minor version specified by the - * other nodes. - * - * If a locking change is made that will not be compatible with older - * versions, the major number must be increased and the minor version set - * to zero. If a change merely adds a behavior that can be disabled when - * speaking to older versions, the minor version must be increased. If a - * change adds a fully backwards compatible change (eg, LVB changes that - * are just ignored by older versions), the version does not need to be - * updated. - */ -static struct ocfs2_locking_protocol lproto = { - .lp_max_version = { - .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, - .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, - }, - .lp_lock_ast = ocfs2_locking_ast, - .lp_blocking_ast = ocfs2_blocking_ast, - .lp_unlock_ast = ocfs2_unlock_ast, -}; - -void ocfs2_set_locking_protocol(void) -{ - ocfs2_stack_glue_set_locking_protocol(&lproto); -} - - static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres) { @@ -3893,12 +3932,10 @@ static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, * considered valid until we remove the OCFS2_LOCK_QUEUED * flag. */ - mlog_entry_void(); - BUG_ON(!lockres); BUG_ON(!lockres->l_ops); - mlog(0, "lockres %s blocked.\n", lockres->l_name); + mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name); /* Detect whether a lock has been marked as going away while * the downconvert thread was processing other things. A lock can @@ -3921,21 +3958,19 @@ unqueue: } else ocfs2_schedule_blocked_lock(osb, lockres); - mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name, + mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name, ctl.requeue ? "yes" : "no"); spin_unlock_irqrestore(&lockres->l_lock, flags); if (ctl.unblock_action != UNBLOCK_CONTINUE && lockres->l_ops->post_unlock) lockres->l_ops->post_unlock(osb, lockres); - - mlog_exit_void(); } static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres) { - mlog_entry_void(); + unsigned long flags; assert_spin_locked(&lockres->l_lock); @@ -3943,32 +3978,29 @@ static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, /* Do not schedule a lock for downconvert when it's on * the way to destruction - any nodes wanting access * to the resource will get it soon. */ - mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n", + mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n", lockres->l_name, lockres->l_flags); return; } lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); - spin_lock(&osb->dc_task_lock); + spin_lock_irqsave(&osb->dc_task_lock, flags); if (list_empty(&lockres->l_blocked_list)) { list_add_tail(&lockres->l_blocked_list, &osb->blocked_lock_list); osb->blocked_lock_count++; } - spin_unlock(&osb->dc_task_lock); - - mlog_exit_void(); + spin_unlock_irqrestore(&osb->dc_task_lock, flags); } static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb) { unsigned long processed; + unsigned long flags; struct ocfs2_lock_res *lockres; - mlog_entry_void(); - - spin_lock(&osb->dc_task_lock); + spin_lock_irqsave(&osb->dc_task_lock, flags); /* grab this early so we know to try again if a state change and * wake happens part-way through our work */ osb->dc_work_sequence = osb->dc_wake_sequence; @@ -3981,40 +4013,40 @@ static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb) struct ocfs2_lock_res, l_blocked_list); list_del_init(&lockres->l_blocked_list); osb->blocked_lock_count--; - spin_unlock(&osb->dc_task_lock); + spin_unlock_irqrestore(&osb->dc_task_lock, flags); BUG_ON(!processed); processed--; ocfs2_process_blocked_lock(osb, lockres); - spin_lock(&osb->dc_task_lock); + spin_lock_irqsave(&osb->dc_task_lock, flags); } - spin_unlock(&osb->dc_task_lock); - - mlog_exit_void(); + spin_unlock_irqrestore(&osb->dc_task_lock, flags); } static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb) { int empty = 0; + unsigned long flags; - spin_lock(&osb->dc_task_lock); + spin_lock_irqsave(&osb->dc_task_lock, flags); if (list_empty(&osb->blocked_lock_list)) empty = 1; - spin_unlock(&osb->dc_task_lock); + spin_unlock_irqrestore(&osb->dc_task_lock, flags); return empty; } static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb) { int should_wake = 0; + unsigned long flags; - spin_lock(&osb->dc_task_lock); + spin_lock_irqsave(&osb->dc_task_lock, flags); if (osb->dc_work_sequence != osb->dc_wake_sequence) should_wake = 1; - spin_unlock(&osb->dc_task_lock); + spin_unlock_irqrestore(&osb->dc_task_lock, flags); return should_wake; } @@ -4044,10 +4076,12 @@ static int ocfs2_downconvert_thread(void *arg) void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb) { - spin_lock(&osb->dc_task_lock); + unsigned long flags; + + spin_lock_irqsave(&osb->dc_task_lock, flags); /* make sure the voting thread gets a swipe at whatever changes * the caller may have made to the voting state */ osb->dc_wake_sequence++; - spin_unlock(&osb->dc_task_lock); + spin_unlock_irqrestore(&osb->dc_task_lock, flags); wake_up(&osb->dc_event); } |
