diff options
Diffstat (limited to 'fs/ocfs2/journal.c')
| -rw-r--r-- | fs/ocfs2/journal.c | 1400 |
1 files changed, 1075 insertions, 325 deletions
diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c index f31c7e8c19c..4b0c68849b3 100644 --- a/fs/ocfs2/journal.c +++ b/fs/ocfs2/journal.c @@ -28,13 +28,16 @@ #include <linux/slab.h> #include <linux/highmem.h> #include <linux/kthread.h> +#include <linux/time.h> +#include <linux/random.h> +#include <linux/delay.h> -#define MLOG_MASK_PREFIX ML_JOURNAL #include <cluster/masklog.h> #include "ocfs2.h" #include "alloc.h" +#include "blockcheck.h" #include "dir.h" #include "dlmglue.h" #include "extent_map.h" @@ -45,73 +48,297 @@ #include "slot_map.h" #include "super.h" #include "sysfile.h" +#include "uptodate.h" +#include "quota.h" #include "buffer_head_io.h" +#include "ocfs2_trace.h" DEFINE_SPINLOCK(trans_inc_lock); +#define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000 + static int ocfs2_force_read_journal(struct inode *inode); static int ocfs2_recover_node(struct ocfs2_super *osb, - int node_num); + int node_num, int slot_num); static int __ocfs2_recovery_thread(void *arg); static int ocfs2_commit_cache(struct ocfs2_super *osb); -static int ocfs2_wait_on_mount(struct ocfs2_super *osb); +static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota); static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb, - int dirty); + int dirty, int replayed); static int ocfs2_trylock_journal(struct ocfs2_super *osb, int slot_num); static int ocfs2_recover_orphans(struct ocfs2_super *osb, int slot); static int ocfs2_commit_thread(void *arg); +static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal, + int slot_num, + struct ocfs2_dinode *la_dinode, + struct ocfs2_dinode *tl_dinode, + struct ocfs2_quota_recovery *qrec); + +static inline int ocfs2_wait_on_mount(struct ocfs2_super *osb) +{ + return __ocfs2_wait_on_mount(osb, 0); +} + +static inline int ocfs2_wait_on_quotas(struct ocfs2_super *osb) +{ + return __ocfs2_wait_on_mount(osb, 1); +} + +/* + * This replay_map is to track online/offline slots, so we could recover + * offline slots during recovery and mount + */ + +enum ocfs2_replay_state { + REPLAY_UNNEEDED = 0, /* Replay is not needed, so ignore this map */ + REPLAY_NEEDED, /* Replay slots marked in rm_replay_slots */ + REPLAY_DONE /* Replay was already queued */ +}; + +struct ocfs2_replay_map { + unsigned int rm_slots; + enum ocfs2_replay_state rm_state; + unsigned char rm_replay_slots[0]; +}; + +void ocfs2_replay_map_set_state(struct ocfs2_super *osb, int state) +{ + if (!osb->replay_map) + return; + + /* If we've already queued the replay, we don't have any more to do */ + if (osb->replay_map->rm_state == REPLAY_DONE) + return; + + osb->replay_map->rm_state = state; +} + +int ocfs2_compute_replay_slots(struct ocfs2_super *osb) +{ + struct ocfs2_replay_map *replay_map; + int i, node_num; + + /* If replay map is already set, we don't do it again */ + if (osb->replay_map) + return 0; + + replay_map = kzalloc(sizeof(struct ocfs2_replay_map) + + (osb->max_slots * sizeof(char)), GFP_KERNEL); + + if (!replay_map) { + mlog_errno(-ENOMEM); + return -ENOMEM; + } + + spin_lock(&osb->osb_lock); + + replay_map->rm_slots = osb->max_slots; + replay_map->rm_state = REPLAY_UNNEEDED; + + /* set rm_replay_slots for offline slot(s) */ + for (i = 0; i < replay_map->rm_slots; i++) { + if (ocfs2_slot_to_node_num_locked(osb, i, &node_num) == -ENOENT) + replay_map->rm_replay_slots[i] = 1; + } + + osb->replay_map = replay_map; + spin_unlock(&osb->osb_lock); + return 0; +} + +void ocfs2_queue_replay_slots(struct ocfs2_super *osb) +{ + struct ocfs2_replay_map *replay_map = osb->replay_map; + int i; + + if (!replay_map) + return; + + if (replay_map->rm_state != REPLAY_NEEDED) + return; + + for (i = 0; i < replay_map->rm_slots; i++) + if (replay_map->rm_replay_slots[i]) + ocfs2_queue_recovery_completion(osb->journal, i, NULL, + NULL, NULL); + replay_map->rm_state = REPLAY_DONE; +} + +void ocfs2_free_replay_slots(struct ocfs2_super *osb) +{ + struct ocfs2_replay_map *replay_map = osb->replay_map; + + if (!osb->replay_map) + return; + + kfree(replay_map); + osb->replay_map = NULL; +} + +int ocfs2_recovery_init(struct ocfs2_super *osb) +{ + struct ocfs2_recovery_map *rm; + + mutex_init(&osb->recovery_lock); + osb->disable_recovery = 0; + osb->recovery_thread_task = NULL; + init_waitqueue_head(&osb->recovery_event); + + rm = kzalloc(sizeof(struct ocfs2_recovery_map) + + osb->max_slots * sizeof(unsigned int), + GFP_KERNEL); + if (!rm) { + mlog_errno(-ENOMEM); + return -ENOMEM; + } + + rm->rm_entries = (unsigned int *)((char *)rm + + sizeof(struct ocfs2_recovery_map)); + osb->recovery_map = rm; + + return 0; +} + +/* we can't grab the goofy sem lock from inside wait_event, so we use + * memory barriers to make sure that we'll see the null task before + * being woken up */ +static int ocfs2_recovery_thread_running(struct ocfs2_super *osb) +{ + mb(); + return osb->recovery_thread_task != NULL; +} + +void ocfs2_recovery_exit(struct ocfs2_super *osb) +{ + struct ocfs2_recovery_map *rm; + + /* disable any new recovery threads and wait for any currently + * running ones to exit. Do this before setting the vol_state. */ + mutex_lock(&osb->recovery_lock); + osb->disable_recovery = 1; + mutex_unlock(&osb->recovery_lock); + wait_event(osb->recovery_event, !ocfs2_recovery_thread_running(osb)); + + /* At this point, we know that no more recovery threads can be + * launched, so wait for any recovery completion work to + * complete. */ + flush_workqueue(ocfs2_wq); + + /* + * Now that recovery is shut down, and the osb is about to be + * freed, the osb_lock is not taken here. + */ + rm = osb->recovery_map; + /* XXX: Should we bug if there are dirty entries? */ + + kfree(rm); +} + +static int __ocfs2_recovery_map_test(struct ocfs2_super *osb, + unsigned int node_num) +{ + int i; + struct ocfs2_recovery_map *rm = osb->recovery_map; + + assert_spin_locked(&osb->osb_lock); + + for (i = 0; i < rm->rm_used; i++) { + if (rm->rm_entries[i] == node_num) + return 1; + } + + return 0; +} + +/* Behaves like test-and-set. Returns the previous value */ +static int ocfs2_recovery_map_set(struct ocfs2_super *osb, + unsigned int node_num) +{ + struct ocfs2_recovery_map *rm = osb->recovery_map; + + spin_lock(&osb->osb_lock); + if (__ocfs2_recovery_map_test(osb, node_num)) { + spin_unlock(&osb->osb_lock); + return 1; + } + + /* XXX: Can this be exploited? Not from o2dlm... */ + BUG_ON(rm->rm_used >= osb->max_slots); + + rm->rm_entries[rm->rm_used] = node_num; + rm->rm_used++; + spin_unlock(&osb->osb_lock); + + return 0; +} + +static void ocfs2_recovery_map_clear(struct ocfs2_super *osb, + unsigned int node_num) +{ + int i; + struct ocfs2_recovery_map *rm = osb->recovery_map; + + spin_lock(&osb->osb_lock); + + for (i = 0; i < rm->rm_used; i++) { + if (rm->rm_entries[i] == node_num) + break; + } + + if (i < rm->rm_used) { + /* XXX: be careful with the pointer math */ + memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]), + (rm->rm_used - i - 1) * sizeof(unsigned int)); + rm->rm_used--; + } + + spin_unlock(&osb->osb_lock); +} static int ocfs2_commit_cache(struct ocfs2_super *osb) { int status = 0; unsigned int flushed; - unsigned long old_id; struct ocfs2_journal *journal = NULL; - mlog_entry_void(); - journal = osb->journal; /* Flush all pending commits and checkpoint the journal. */ down_write(&journal->j_trans_barrier); - if (atomic_read(&journal->j_num_trans) == 0) { + flushed = atomic_read(&journal->j_num_trans); + trace_ocfs2_commit_cache_begin(flushed); + if (flushed == 0) { up_write(&journal->j_trans_barrier); - mlog(0, "No transactions for me to flush!\n"); goto finally; } - journal_lock_updates(journal->j_journal); - status = journal_flush(journal->j_journal); - journal_unlock_updates(journal->j_journal); + jbd2_journal_lock_updates(journal->j_journal); + status = jbd2_journal_flush(journal->j_journal); + jbd2_journal_unlock_updates(journal->j_journal); if (status < 0) { up_write(&journal->j_trans_barrier); mlog_errno(status); goto finally; } - old_id = ocfs2_inc_trans_id(journal); + ocfs2_inc_trans_id(journal); flushed = atomic_read(&journal->j_num_trans); atomic_set(&journal->j_num_trans, 0); up_write(&journal->j_trans_barrier); - mlog(0, "commit_thread: flushed transaction %lu (%u handles)\n", - journal->j_trans_id, flushed); + trace_ocfs2_commit_cache_end(journal->j_trans_id, flushed); ocfs2_wake_downconvert_thread(osb); wake_up(&journal->j_checkpointed); finally: - mlog_exit(status); return status; } -/* pass it NULL and it will allocate a new handle object for you. If - * you pass it a handle however, it may still return error, in which - * case it has free'd the passed handle for you. */ handle_t *ocfs2_start_trans(struct ocfs2_super *osb, int max_buffs) { journal_t *journal = osb->journal->j_journal; @@ -125,17 +352,18 @@ handle_t *ocfs2_start_trans(struct ocfs2_super *osb, int max_buffs) BUG_ON(osb->journal->j_state == OCFS2_JOURNAL_FREE); BUG_ON(max_buffs <= 0); - /* JBD might support this, but our journalling code doesn't yet. */ - if (journal_current_handle()) { - mlog(ML_ERROR, "Recursive transaction attempted!\n"); - BUG(); - } + /* Nested transaction? Just return the handle... */ + if (journal_current_handle()) + return jbd2_journal_start(journal, max_buffs); + + sb_start_intwrite(osb->sb); down_read(&osb->journal->j_trans_barrier); - handle = journal_start(journal, max_buffs); + handle = jbd2_journal_start(journal, max_buffs); if (IS_ERR(handle)) { up_read(&osb->journal->j_trans_barrier); + sb_end_intwrite(osb->sb); mlog_errno(PTR_ERR(handle)); @@ -154,26 +382,28 @@ handle_t *ocfs2_start_trans(struct ocfs2_super *osb, int max_buffs) int ocfs2_commit_trans(struct ocfs2_super *osb, handle_t *handle) { - int ret; + int ret, nested; struct ocfs2_journal *journal = osb->journal; BUG_ON(!handle); - ret = journal_stop(handle); + nested = handle->h_ref > 1; + ret = jbd2_journal_stop(handle); if (ret < 0) mlog_errno(ret); - up_read(&journal->j_trans_barrier); + if (!nested) { + up_read(&journal->j_trans_barrier); + sb_end_intwrite(osb->sb); + } return ret; } /* - * 'nblocks' is what you want to add to the current - * transaction. extend_trans will either extend the current handle by - * nblocks, or commit it and start a new one with nblocks credits. + * 'nblocks' is what you want to add to the current transaction. * - * This might call journal_restart() which will commit dirty buffers + * This might call jbd2_journal_restart() which will commit dirty buffers * and then restart the transaction. Before calling * ocfs2_extend_trans(), any changed blocks should have been * dirtied. After calling it, all blocks which need to be changed must @@ -189,19 +419,22 @@ int ocfs2_commit_trans(struct ocfs2_super *osb, */ int ocfs2_extend_trans(handle_t *handle, int nblocks) { - int status; + int status, old_nblocks; BUG_ON(!handle); - BUG_ON(!nblocks); + BUG_ON(nblocks < 0); - mlog_entry_void(); + if (!nblocks) + return 0; - mlog(0, "Trying to extend transaction by %d blocks\n", nblocks); + old_nblocks = handle->h_buffer_credits; -#ifdef OCFS2_DEBUG_FS + trace_ocfs2_extend_trans(old_nblocks, nblocks); + +#ifdef CONFIG_OCFS2_DEBUG_FS status = 1; #else - status = journal_extend(handle, nblocks); + status = jbd2_journal_extend(handle, nblocks); if (status < 0) { mlog_errno(status); goto bail; @@ -209,8 +442,9 @@ int ocfs2_extend_trans(handle_t *handle, int nblocks) #endif if (status > 0) { - mlog(0, "journal_extend failed, trying journal_restart\n"); - status = journal_restart(handle, nblocks); + trace_ocfs2_extend_trans_restart(old_nblocks + nblocks); + status = jbd2_journal_restart(handle, + old_nblocks + nblocks); if (status < 0) { mlog_errno(status); goto bail; @@ -219,28 +453,211 @@ int ocfs2_extend_trans(handle_t *handle, int nblocks) status = 0; bail: + return status; +} + +/* + * If we have fewer than thresh credits, extend by OCFS2_MAX_TRANS_DATA. + * If that fails, restart the transaction & regain write access for the + * buffer head which is used for metadata modifications. + * Taken from Ext4: extend_or_restart_transaction() + */ +int ocfs2_allocate_extend_trans(handle_t *handle, int thresh) +{ + int status, old_nblks; + + BUG_ON(!handle); + + old_nblks = handle->h_buffer_credits; + trace_ocfs2_allocate_extend_trans(old_nblks, thresh); + + if (old_nblks < thresh) + return 0; + + status = jbd2_journal_extend(handle, OCFS2_MAX_TRANS_DATA); + if (status < 0) { + mlog_errno(status); + goto bail; + } + + if (status > 0) { + status = jbd2_journal_restart(handle, OCFS2_MAX_TRANS_DATA); + if (status < 0) + mlog_errno(status); + } - mlog_exit(status); +bail: return status; } -int ocfs2_journal_access(handle_t *handle, - struct inode *inode, - struct buffer_head *bh, - int type) + +struct ocfs2_triggers { + struct jbd2_buffer_trigger_type ot_triggers; + int ot_offset; +}; + +static inline struct ocfs2_triggers *to_ocfs2_trigger(struct jbd2_buffer_trigger_type *triggers) +{ + return container_of(triggers, struct ocfs2_triggers, ot_triggers); +} + +static void ocfs2_frozen_trigger(struct jbd2_buffer_trigger_type *triggers, + struct buffer_head *bh, + void *data, size_t size) +{ + struct ocfs2_triggers *ot = to_ocfs2_trigger(triggers); + + /* + * We aren't guaranteed to have the superblock here, so we + * must unconditionally compute the ecc data. + * __ocfs2_journal_access() will only set the triggers if + * metaecc is enabled. + */ + ocfs2_block_check_compute(data, size, data + ot->ot_offset); +} + +/* + * Quota blocks have their own trigger because the struct ocfs2_block_check + * offset depends on the blocksize. + */ +static void ocfs2_dq_frozen_trigger(struct jbd2_buffer_trigger_type *triggers, + struct buffer_head *bh, + void *data, size_t size) +{ + struct ocfs2_disk_dqtrailer *dqt = + ocfs2_block_dqtrailer(size, data); + + /* + * We aren't guaranteed to have the superblock here, so we + * must unconditionally compute the ecc data. + * __ocfs2_journal_access() will only set the triggers if + * metaecc is enabled. + */ + ocfs2_block_check_compute(data, size, &dqt->dq_check); +} + +/* + * Directory blocks also have their own trigger because the + * struct ocfs2_block_check offset depends on the blocksize. + */ +static void ocfs2_db_frozen_trigger(struct jbd2_buffer_trigger_type *triggers, + struct buffer_head *bh, + void *data, size_t size) +{ + struct ocfs2_dir_block_trailer *trailer = + ocfs2_dir_trailer_from_size(size, data); + + /* + * We aren't guaranteed to have the superblock here, so we + * must unconditionally compute the ecc data. + * __ocfs2_journal_access() will only set the triggers if + * metaecc is enabled. + */ + ocfs2_block_check_compute(data, size, &trailer->db_check); +} + +static void ocfs2_abort_trigger(struct jbd2_buffer_trigger_type *triggers, + struct buffer_head *bh) +{ + mlog(ML_ERROR, + "ocfs2_abort_trigger called by JBD2. bh = 0x%lx, " + "bh->b_blocknr = %llu\n", + (unsigned long)bh, + (unsigned long long)bh->b_blocknr); + + /* We aren't guaranteed to have the superblock here - but if we + * don't, it'll just crash. */ + ocfs2_error(bh->b_assoc_map->host->i_sb, + "JBD2 has aborted our journal, ocfs2 cannot continue\n"); +} + +static struct ocfs2_triggers di_triggers = { + .ot_triggers = { + .t_frozen = ocfs2_frozen_trigger, + .t_abort = ocfs2_abort_trigger, + }, + .ot_offset = offsetof(struct ocfs2_dinode, i_check), +}; + +static struct ocfs2_triggers eb_triggers = { + .ot_triggers = { + .t_frozen = ocfs2_frozen_trigger, + .t_abort = ocfs2_abort_trigger, + }, + .ot_offset = offsetof(struct ocfs2_extent_block, h_check), +}; + +static struct ocfs2_triggers rb_triggers = { + .ot_triggers = { + .t_frozen = ocfs2_frozen_trigger, + .t_abort = ocfs2_abort_trigger, + }, + .ot_offset = offsetof(struct ocfs2_refcount_block, rf_check), +}; + +static struct ocfs2_triggers gd_triggers = { + .ot_triggers = { + .t_frozen = ocfs2_frozen_trigger, + .t_abort = ocfs2_abort_trigger, + }, + .ot_offset = offsetof(struct ocfs2_group_desc, bg_check), +}; + +static struct ocfs2_triggers db_triggers = { + .ot_triggers = { + .t_frozen = ocfs2_db_frozen_trigger, + .t_abort = ocfs2_abort_trigger, + }, +}; + +static struct ocfs2_triggers xb_triggers = { + .ot_triggers = { + .t_frozen = ocfs2_frozen_trigger, + .t_abort = ocfs2_abort_trigger, + }, + .ot_offset = offsetof(struct ocfs2_xattr_block, xb_check), +}; + +static struct ocfs2_triggers dq_triggers = { + .ot_triggers = { + .t_frozen = ocfs2_dq_frozen_trigger, + .t_abort = ocfs2_abort_trigger, + }, +}; + +static struct ocfs2_triggers dr_triggers = { + .ot_triggers = { + .t_frozen = ocfs2_frozen_trigger, + .t_abort = ocfs2_abort_trigger, + }, + .ot_offset = offsetof(struct ocfs2_dx_root_block, dr_check), +}; + +static struct ocfs2_triggers dl_triggers = { + .ot_triggers = { + .t_frozen = ocfs2_frozen_trigger, + .t_abort = ocfs2_abort_trigger, + }, + .ot_offset = offsetof(struct ocfs2_dx_leaf, dl_check), +}; + +static int __ocfs2_journal_access(handle_t *handle, + struct ocfs2_caching_info *ci, + struct buffer_head *bh, + struct ocfs2_triggers *triggers, + int type) { int status; + struct ocfs2_super *osb = + OCFS2_SB(ocfs2_metadata_cache_get_super(ci)); - BUG_ON(!inode); + BUG_ON(!ci || !ci->ci_ops); BUG_ON(!handle); BUG_ON(!bh); - mlog_entry("bh->b_blocknr=%llu, type=%d (\"%s\"), bh->b_size = %zu\n", - (unsigned long long)bh->b_blocknr, type, - (type == OCFS2_JOURNAL_ACCESS_CREATE) ? - "OCFS2_JOURNAL_ACCESS_CREATE" : - "OCFS2_JOURNAL_ACCESS_WRITE", - bh->b_size); + trace_ocfs2_journal_access( + (unsigned long long)ocfs2_metadata_cache_owner(ci), + (unsigned long long)bh->b_blocknr, type, bh->b_size); /* we can safely remove this assertion after testing. */ if (!buffer_uptodate(bh)) { @@ -250,70 +667,112 @@ int ocfs2_journal_access(handle_t *handle, BUG(); } - /* Set the current transaction information on the inode so + /* Set the current transaction information on the ci so * that the locking code knows whether it can drop it's locks - * on this inode or not. We're protected from the commit + * on this ci or not. We're protected from the commit * thread updating the current transaction id until * ocfs2_commit_trans() because ocfs2_start_trans() took * j_trans_barrier for us. */ - ocfs2_set_inode_lock_trans(OCFS2_SB(inode->i_sb)->journal, inode); + ocfs2_set_ci_lock_trans(osb->journal, ci); - mutex_lock(&OCFS2_I(inode)->ip_io_mutex); + ocfs2_metadata_cache_io_lock(ci); switch (type) { case OCFS2_JOURNAL_ACCESS_CREATE: case OCFS2_JOURNAL_ACCESS_WRITE: - status = journal_get_write_access(handle, bh); + status = jbd2_journal_get_write_access(handle, bh); break; case OCFS2_JOURNAL_ACCESS_UNDO: - status = journal_get_undo_access(handle, bh); + status = jbd2_journal_get_undo_access(handle, bh); break; default: status = -EINVAL; - mlog(ML_ERROR, "Uknown access type!\n"); + mlog(ML_ERROR, "Unknown access type!\n"); } - mutex_unlock(&OCFS2_I(inode)->ip_io_mutex); + if (!status && ocfs2_meta_ecc(osb) && triggers) + jbd2_journal_set_triggers(bh, &triggers->ot_triggers); + ocfs2_metadata_cache_io_unlock(ci); if (status < 0) mlog(ML_ERROR, "Error %d getting %d access to buffer!\n", status, type); - mlog_exit(status); return status; } -int ocfs2_journal_dirty(handle_t *handle, - struct buffer_head *bh) +int ocfs2_journal_access_di(handle_t *handle, struct ocfs2_caching_info *ci, + struct buffer_head *bh, int type) { - int status; + return __ocfs2_journal_access(handle, ci, bh, &di_triggers, type); +} + +int ocfs2_journal_access_eb(handle_t *handle, struct ocfs2_caching_info *ci, + struct buffer_head *bh, int type) +{ + return __ocfs2_journal_access(handle, ci, bh, &eb_triggers, type); +} - mlog_entry("(bh->b_blocknr=%llu)\n", - (unsigned long long)bh->b_blocknr); +int ocfs2_journal_access_rb(handle_t *handle, struct ocfs2_caching_info *ci, + struct buffer_head *bh, int type) +{ + return __ocfs2_journal_access(handle, ci, bh, &rb_triggers, + type); +} - status = journal_dirty_metadata(handle, bh); - if (status < 0) - mlog(ML_ERROR, "Could not dirty metadata buffer. " - "(bh->b_blocknr=%llu)\n", - (unsigned long long)bh->b_blocknr); +int ocfs2_journal_access_gd(handle_t *handle, struct ocfs2_caching_info *ci, + struct buffer_head *bh, int type) +{ + return __ocfs2_journal_access(handle, ci, bh, &gd_triggers, type); +} - mlog_exit(status); - return status; +int ocfs2_journal_access_db(handle_t *handle, struct ocfs2_caching_info *ci, + struct buffer_head *bh, int type) +{ + return __ocfs2_journal_access(handle, ci, bh, &db_triggers, type); } -int ocfs2_journal_dirty_data(handle_t *handle, - struct buffer_head *bh) +int ocfs2_journal_access_xb(handle_t *handle, struct ocfs2_caching_info *ci, + struct buffer_head *bh, int type) { - int err = journal_dirty_data(handle, bh); - if (err) - mlog_errno(err); - /* TODO: When we can handle it, abort the handle and go RO on - * error here. */ + return __ocfs2_journal_access(handle, ci, bh, &xb_triggers, type); +} - return err; +int ocfs2_journal_access_dq(handle_t *handle, struct ocfs2_caching_info *ci, + struct buffer_head *bh, int type) +{ + return __ocfs2_journal_access(handle, ci, bh, &dq_triggers, type); } -#define OCFS2_DEFAULT_COMMIT_INTERVAL (HZ * JBD_DEFAULT_MAX_COMMIT_AGE) +int ocfs2_journal_access_dr(handle_t *handle, struct ocfs2_caching_info *ci, + struct buffer_head *bh, int type) +{ + return __ocfs2_journal_access(handle, ci, bh, &dr_triggers, type); +} + +int ocfs2_journal_access_dl(handle_t *handle, struct ocfs2_caching_info *ci, + struct buffer_head *bh, int type) +{ + return __ocfs2_journal_access(handle, ci, bh, &dl_triggers, type); +} + +int ocfs2_journal_access(handle_t *handle, struct ocfs2_caching_info *ci, + struct buffer_head *bh, int type) +{ + return __ocfs2_journal_access(handle, ci, bh, NULL, type); +} + +void ocfs2_journal_dirty(handle_t *handle, struct buffer_head *bh) +{ + int status; + + trace_ocfs2_journal_dirty((unsigned long long)bh->b_blocknr); + + status = jbd2_journal_dirty_metadata(handle, bh); + BUG_ON(status); +} + +#define OCFS2_DEFAULT_COMMIT_INTERVAL (HZ * JBD2_DEFAULT_MAX_COMMIT_AGE) void ocfs2_set_journal_params(struct ocfs2_super *osb) { @@ -323,13 +782,13 @@ void ocfs2_set_journal_params(struct ocfs2_super *osb) if (osb->osb_commit_interval) commit_interval = osb->osb_commit_interval; - spin_lock(&journal->j_state_lock); + write_lock(&journal->j_state_lock); journal->j_commit_interval = commit_interval; if (osb->s_mount_opt & OCFS2_MOUNT_BARRIER) - journal->j_flags |= JFS_BARRIER; + journal->j_flags |= JBD2_BARRIER; else - journal->j_flags &= ~JFS_BARRIER; - spin_unlock(&journal->j_state_lock); + journal->j_flags &= ~JBD2_BARRIER; + write_unlock(&journal->j_state_lock); } int ocfs2_journal_init(struct ocfs2_journal *journal, int *dirty) @@ -342,8 +801,6 @@ int ocfs2_journal_init(struct ocfs2_journal *journal, int *dirty) struct ocfs2_super *osb; int inode_lock = 0; - mlog_entry_void(); - BUG_ON(!journal); osb = journal->j_osb; @@ -380,28 +837,26 @@ int ocfs2_journal_init(struct ocfs2_journal *journal, int *dirty) inode_lock = 1; di = (struct ocfs2_dinode *)bh->b_data; - if (inode->i_size < OCFS2_MIN_JOURNAL_SIZE) { + if (i_size_read(inode) < OCFS2_MIN_JOURNAL_SIZE) { mlog(ML_ERROR, "Journal file size (%lld) is too small!\n", - inode->i_size); + i_size_read(inode)); status = -EINVAL; goto done; } - mlog(0, "inode->i_size = %lld\n", inode->i_size); - mlog(0, "inode->i_blocks = %llu\n", - (unsigned long long)inode->i_blocks); - mlog(0, "inode->ip_clusters = %u\n", OCFS2_I(inode)->ip_clusters); + trace_ocfs2_journal_init(i_size_read(inode), + (unsigned long long)inode->i_blocks, + OCFS2_I(inode)->ip_clusters); /* call the kernels journal init function now */ - j_journal = journal_init_inode(inode); + j_journal = jbd2_journal_init_inode(inode); if (j_journal == NULL) { mlog(ML_ERROR, "Linux journal layer error\n"); status = -EINVAL; goto done; } - mlog(0, "Returned from journal_init_inode\n"); - mlog(0, "j_journal->j_maxlen = %u\n", j_journal->j_maxlen); + trace_ocfs2_journal_init_maxlen(j_journal->j_maxlen); *dirty = (le32_to_cpu(di->id1.journal1.ij_flags) & OCFS2_JOURNAL_DIRTY_FL); @@ -419,20 +874,28 @@ done: if (status < 0) { if (inode_lock) ocfs2_inode_unlock(inode, 1); - if (bh != NULL) - brelse(bh); + brelse(bh); if (inode) { OCFS2_I(inode)->ip_open_count--; iput(inode); } } - mlog_exit(status); return status; } +static void ocfs2_bump_recovery_generation(struct ocfs2_dinode *di) +{ + le32_add_cpu(&(di->id1.journal1.ij_recovery_generation), 1); +} + +static u32 ocfs2_get_recovery_generation(struct ocfs2_dinode *di) +{ + return le32_to_cpu(di->id1.journal1.ij_recovery_generation); +} + static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb, - int dirty) + int dirty, int replayed) { int status; unsigned int flags; @@ -440,20 +903,12 @@ static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb, struct buffer_head *bh = journal->j_bh; struct ocfs2_dinode *fe; - mlog_entry_void(); - fe = (struct ocfs2_dinode *)bh->b_data; - if (!OCFS2_IS_VALID_DINODE(fe)) { - /* This is called from startup/shutdown which will - * handle the errors in a specific manner, so no need - * to call ocfs2_error() here. */ - mlog(ML_ERROR, "Journal dinode %llu has invalid " - "signature: %.*s", - (unsigned long long)le64_to_cpu(fe->i_blkno), 7, - fe->i_signature); - status = -EIO; - goto out; - } + + /* The journal bh on the osb always comes from ocfs2_journal_init() + * and was validated there inside ocfs2_inode_lock_full(). It's a + * code bug if we mess it up. */ + BUG_ON(!OCFS2_IS_VALID_DINODE(fe)); flags = le32_to_cpu(fe->id1.journal1.ij_flags); if (dirty) @@ -462,12 +917,14 @@ static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb, flags &= ~OCFS2_JOURNAL_DIRTY_FL; fe->id1.journal1.ij_flags = cpu_to_le32(flags); - status = ocfs2_write_block(osb, bh, journal->j_inode); + if (replayed) + ocfs2_bump_recovery_generation(fe); + + ocfs2_compute_meta_ecc(osb->sb, bh->b_data, &fe->i_check); + status = ocfs2_write_block(osb, bh, INODE_CACHE(journal->j_inode)); if (status < 0) mlog_errno(status); -out: - mlog_exit(status); return status; } @@ -482,8 +939,6 @@ void ocfs2_journal_shutdown(struct ocfs2_super *osb) struct inode *inode = NULL; int num_running_trans = 0; - mlog_entry_void(); - BUG_ON(!osb); journal = osb->journal; @@ -495,15 +950,12 @@ void ocfs2_journal_shutdown(struct ocfs2_super *osb) if (journal->j_state != OCFS2_JOURNAL_LOADED) goto done; - /* need to inc inode use count as journal_destroy will iput. */ + /* need to inc inode use count - jbd2_journal_destroy will iput. */ if (!igrab(inode)) BUG(); num_running_trans = atomic_read(&(osb->journal->j_num_trans)); - if (num_running_trans > 0) - mlog(0, "Shutting down journal: must wait on %d " - "running transactions!\n", - num_running_trans); + trace_ocfs2_journal_shutdown(num_running_trans); /* Do a commit_cache here. It will flush our journal, *and* * release any locks that are still held. @@ -516,7 +968,7 @@ void ocfs2_journal_shutdown(struct ocfs2_super *osb) * completely destroy the journal. */ if (osb->commit_task) { /* Wait for the commit thread */ - mlog(0, "Waiting for ocfs2commit to exit....\n"); + trace_ocfs2_journal_shutdown_wait(osb->commit_task); kthread_stop(osb->commit_task); osb->commit_task = NULL; } @@ -524,9 +976,9 @@ void ocfs2_journal_shutdown(struct ocfs2_super *osb) BUG_ON(atomic_read(&(osb->journal->j_num_trans)) != 0); if (ocfs2_mount_local(osb)) { - journal_lock_updates(journal->j_journal); - status = journal_flush(journal->j_journal); - journal_unlock_updates(journal->j_journal); + jbd2_journal_lock_updates(journal->j_journal); + status = jbd2_journal_flush(journal->j_journal); + jbd2_journal_unlock_updates(journal->j_journal); if (status < 0) mlog_errno(status); } @@ -536,13 +988,14 @@ void ocfs2_journal_shutdown(struct ocfs2_super *osb) * Do not toggle if flush was unsuccessful otherwise * will leave dirty metadata in a "clean" journal */ - status = ocfs2_journal_toggle_dirty(osb, 0); + status = ocfs2_journal_toggle_dirty(osb, 0, 0); if (status < 0) mlog_errno(status); } /* Shutdown the kernel journal system */ - journal_destroy(journal->j_journal); + jbd2_journal_destroy(journal->j_journal); + journal->j_journal = NULL; OCFS2_I(inode)->ip_open_count--; @@ -558,7 +1011,6 @@ void ocfs2_journal_shutdown(struct ocfs2_super *osb) done: if (inode) iput(inode); - mlog_exit_void(); } static void ocfs2_clear_journal_error(struct super_block *sb, @@ -567,31 +1019,28 @@ static void ocfs2_clear_journal_error(struct super_block *sb, { int olderr; - olderr = journal_errno(journal); + olderr = jbd2_journal_errno(journal); if (olderr) { mlog(ML_ERROR, "File system error %d recorded in " "journal %u.\n", olderr, slot); mlog(ML_ERROR, "File system on device %s needs checking.\n", sb->s_id); - journal_ack_err(journal); - journal_clear_err(journal); + jbd2_journal_ack_err(journal); + jbd2_journal_clear_err(journal); } } -int ocfs2_journal_load(struct ocfs2_journal *journal, int local) +int ocfs2_journal_load(struct ocfs2_journal *journal, int local, int replayed) { int status = 0; struct ocfs2_super *osb; - mlog_entry_void(); - - if (!journal) - BUG(); + BUG_ON(!journal); osb = journal->j_osb; - status = journal_load(journal->j_journal); + status = jbd2_journal_load(journal->j_journal); if (status < 0) { mlog(ML_ERROR, "Failed to load journal!\n"); goto done; @@ -599,7 +1048,7 @@ int ocfs2_journal_load(struct ocfs2_journal *journal, int local) ocfs2_clear_journal_error(osb->sb, journal->j_journal, osb->slot_num); - status = ocfs2_journal_toggle_dirty(osb, 1); + status = ocfs2_journal_toggle_dirty(osb, 1, replayed); if (status < 0) { mlog_errno(status); goto done; @@ -620,7 +1069,6 @@ int ocfs2_journal_load(struct ocfs2_journal *journal, int local) osb->commit_task = NULL; done: - mlog_exit(status); return status; } @@ -631,25 +1079,39 @@ int ocfs2_journal_wipe(struct ocfs2_journal *journal, int full) { int status; - mlog_entry_void(); - BUG_ON(!journal); - status = journal_wipe(journal->j_journal, full); + status = jbd2_journal_wipe(journal->j_journal, full); if (status < 0) { mlog_errno(status); goto bail; } - status = ocfs2_journal_toggle_dirty(journal->j_osb, 0); + status = ocfs2_journal_toggle_dirty(journal->j_osb, 0, 0); if (status < 0) mlog_errno(status); bail: - mlog_exit(status); return status; } +static int ocfs2_recovery_completed(struct ocfs2_super *osb) +{ + int empty; + struct ocfs2_recovery_map *rm = osb->recovery_map; + + spin_lock(&osb->osb_lock); + empty = (rm->rm_used == 0); + spin_unlock(&osb->osb_lock); + + return empty; +} + +void ocfs2_wait_for_recovery(struct ocfs2_super *osb) +{ + wait_event(osb->recovery_event, ocfs2_recovery_completed(osb)); +} + /* * JBD Might read a cached version of another nodes journal file. We * don't want this as this file changes often and we get no @@ -668,11 +1130,9 @@ static int ocfs2_force_read_journal(struct inode *inode) #define CONCURRENT_JOURNAL_FILL 32ULL struct buffer_head *bhs[CONCURRENT_JOURNAL_FILL]; - mlog_entry_void(); - memset(bhs, 0, sizeof(struct buffer_head *) * CONCURRENT_JOURNAL_FILL); - num_blocks = ocfs2_blocks_for_bytes(inode->i_sb, inode->i_size); + num_blocks = ocfs2_blocks_for_bytes(inode->i_sb, i_size_read(inode)); v_blkno = 0; while (v_blkno < num_blocks) { status = ocfs2_extent_map_get_blocks(inode, v_blkno, @@ -687,9 +1147,8 @@ static int ocfs2_force_read_journal(struct inode *inode) /* We are reading journal data which should not * be put in the uptodate cache */ - status = ocfs2_read_blocks(OCFS2_SB(inode->i_sb), - p_blkno, p_blocks, bhs, 0, - NULL); + status = ocfs2_read_blocks_sync(OCFS2_SB(inode->i_sb), + p_blkno, p_blocks, bhs); if (status < 0) { mlog_errno(status); goto bail; @@ -705,9 +1164,7 @@ static int ocfs2_force_read_journal(struct inode *inode) bail: for(i = 0; i < CONCURRENT_JOURNAL_FILL; i++) - if (bhs[i]) - brelse(bhs[i]); - mlog_exit(status); + brelse(bhs[i]); return status; } @@ -716,6 +1173,7 @@ struct ocfs2_la_recovery_item { int lri_slot; struct ocfs2_dinode *lri_la_dinode; struct ocfs2_dinode *lri_tl_dinode; + struct ocfs2_quota_recovery *lri_qrec; }; /* Does the second half of the recovery process. By this point, the @@ -730,17 +1188,17 @@ struct ocfs2_la_recovery_item { */ void ocfs2_complete_recovery(struct work_struct *work) { - int ret; + int ret = 0; struct ocfs2_journal *journal = container_of(work, struct ocfs2_journal, j_recovery_work); struct ocfs2_super *osb = journal->j_osb; struct ocfs2_dinode *la_dinode, *tl_dinode; struct ocfs2_la_recovery_item *item, *n; + struct ocfs2_quota_recovery *qrec; LIST_HEAD(tmp_la_list); - mlog_entry_void(); - - mlog(0, "completing recovery from keventd\n"); + trace_ocfs2_complete_recovery( + (unsigned long long)OCFS2_I(journal->j_inode)->ip_blkno); spin_lock(&journal->j_lock); list_splice_init(&journal->j_la_cleanups, &tmp_la_list); @@ -749,13 +1207,18 @@ void ocfs2_complete_recovery(struct work_struct *work) list_for_each_entry_safe(item, n, &tmp_la_list, lri_list) { list_del_init(&item->lri_list); - mlog(0, "Complete recovery for slot %d\n", item->lri_slot); + ocfs2_wait_on_quotas(osb); la_dinode = item->lri_la_dinode; - if (la_dinode) { - mlog(0, "Clean up local alloc %llu\n", - (unsigned long long)le64_to_cpu(la_dinode->i_blkno)); + tl_dinode = item->lri_tl_dinode; + qrec = item->lri_qrec; + + trace_ocfs2_complete_recovery_slot(item->lri_slot, + la_dinode ? le64_to_cpu(la_dinode->i_blkno) : 0, + tl_dinode ? le64_to_cpu(tl_dinode->i_blkno) : 0, + qrec); + if (la_dinode) { ret = ocfs2_complete_local_alloc_recovery(osb, la_dinode); if (ret < 0) @@ -764,11 +1227,7 @@ void ocfs2_complete_recovery(struct work_struct *work) kfree(la_dinode); } - tl_dinode = item->lri_tl_dinode; if (tl_dinode) { - mlog(0, "Clean up truncate log %llu\n", - (unsigned long long)le64_to_cpu(tl_dinode->i_blkno)); - ret = ocfs2_complete_truncate_log_recovery(osb, tl_dinode); if (ret < 0) @@ -781,11 +1240,18 @@ void ocfs2_complete_recovery(struct work_struct *work) if (ret < 0) mlog_errno(ret); + if (qrec) { + ret = ocfs2_finish_quota_recovery(osb, qrec, + item->lri_slot); + if (ret < 0) + mlog_errno(ret); + /* Recovery info is already freed now */ + } + kfree(item); } - mlog(0, "Recovery completion\n"); - mlog_exit_void(); + trace_ocfs2_complete_recovery_end(ret); } /* NOTE: This function always eats your references to la_dinode and @@ -794,7 +1260,8 @@ void ocfs2_complete_recovery(struct work_struct *work) static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal, int slot_num, struct ocfs2_dinode *la_dinode, - struct ocfs2_dinode *tl_dinode) + struct ocfs2_dinode *tl_dinode, + struct ocfs2_quota_recovery *qrec) { struct ocfs2_la_recovery_item *item; @@ -803,11 +1270,11 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal, /* Though we wish to avoid it, we are in fact safe in * skipping local alloc cleanup as fsck.ocfs2 is more * than capable of reclaiming unused space. */ - if (la_dinode) - kfree(la_dinode); + kfree(la_dinode); + kfree(tl_dinode); - if (tl_dinode) - kfree(tl_dinode); + if (qrec) + ocfs2_free_quota_recovery(qrec); mlog_errno(-ENOMEM); return; @@ -817,6 +1284,7 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal, item->lri_la_dinode = la_dinode; item->lri_slot = slot_num; item->lri_tl_dinode = tl_dinode; + item->lri_qrec = qrec; spin_lock(&journal->j_lock); list_add_tail(&item->lri_list, &journal->j_la_cleanups); @@ -825,37 +1293,60 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal, } /* Called by the mount code to queue recovery the last part of - * recovery for it's own slot. */ + * recovery for it's own and offline slot(s). */ void ocfs2_complete_mount_recovery(struct ocfs2_super *osb) { struct ocfs2_journal *journal = osb->journal; - if (osb->dirty) { - /* No need to queue up our truncate_log as regular - * cleanup will catch that. */ - ocfs2_queue_recovery_completion(journal, - osb->slot_num, - osb->local_alloc_copy, - NULL); - ocfs2_schedule_truncate_log_flush(osb, 0); + if (ocfs2_is_hard_readonly(osb)) + return; + + /* No need to queue up our truncate_log as regular cleanup will catch + * that */ + ocfs2_queue_recovery_completion(journal, osb->slot_num, + osb->local_alloc_copy, NULL, NULL); + ocfs2_schedule_truncate_log_flush(osb, 0); + + osb->local_alloc_copy = NULL; + osb->dirty = 0; + + /* queue to recover orphan slots for all offline slots */ + ocfs2_replay_map_set_state(osb, REPLAY_NEEDED); + ocfs2_queue_replay_slots(osb); + ocfs2_free_replay_slots(osb); +} - osb->local_alloc_copy = NULL; - osb->dirty = 0; +void ocfs2_complete_quota_recovery(struct ocfs2_super *osb) +{ + if (osb->quota_rec) { + ocfs2_queue_recovery_completion(osb->journal, + osb->slot_num, + NULL, + NULL, + osb->quota_rec); + osb->quota_rec = NULL; } } static int __ocfs2_recovery_thread(void *arg) { - int status, node_num; + int status, node_num, slot_num; struct ocfs2_super *osb = arg; - - mlog_entry_void(); + struct ocfs2_recovery_map *rm = osb->recovery_map; + int *rm_quota = NULL; + int rm_quota_used = 0, i; + struct ocfs2_quota_recovery *qrec; status = ocfs2_wait_on_mount(osb); if (status < 0) { goto bail; } + rm_quota = kzalloc(osb->max_slots * sizeof(int), GFP_NOFS); + if (!rm_quota) { + status = -ENOMEM; + goto bail; + } restart: status = ocfs2_super_lock(osb, 1); if (status < 0) { @@ -863,49 +1354,95 @@ restart: goto bail; } - while(!ocfs2_node_map_is_empty(osb, &osb->recovery_map)) { - node_num = ocfs2_node_map_first_set_bit(osb, - &osb->recovery_map); - if (node_num == O2NM_INVALID_NODE_NUM) { - mlog(0, "Out of nodes to recover.\n"); - break; + status = ocfs2_compute_replay_slots(osb); + if (status < 0) + mlog_errno(status); + + /* queue recovery for our own slot */ + ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL, + NULL, NULL); + + spin_lock(&osb->osb_lock); + while (rm->rm_used) { + /* It's always safe to remove entry zero, as we won't + * clear it until ocfs2_recover_node() has succeeded. */ + node_num = rm->rm_entries[0]; + spin_unlock(&osb->osb_lock); + slot_num = ocfs2_node_num_to_slot(osb, node_num); + trace_ocfs2_recovery_thread_node(node_num, slot_num); + if (slot_num == -ENOENT) { + status = 0; + goto skip_recovery; } - status = ocfs2_recover_node(osb, node_num); - if (status < 0) { + /* It is a bit subtle with quota recovery. We cannot do it + * immediately because we have to obtain cluster locks from + * quota files and we also don't want to just skip it because + * then quota usage would be out of sync until some node takes + * the slot. So we remember which nodes need quota recovery + * and when everything else is done, we recover quotas. */ + for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++); + if (i == rm_quota_used) + rm_quota[rm_quota_used++] = slot_num; + + status = ocfs2_recover_node(osb, node_num, slot_num); +skip_recovery: + if (!status) { + ocfs2_recovery_map_clear(osb, node_num); + } else { mlog(ML_ERROR, "Error %d recovering node %d on device (%u,%u)!\n", status, node_num, MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev)); mlog(ML_ERROR, "Volume requires unmount.\n"); - continue; } - ocfs2_recovery_map_clear(osb, node_num); + spin_lock(&osb->osb_lock); + } + spin_unlock(&osb->osb_lock); + trace_ocfs2_recovery_thread_end(status); + + /* Refresh all journal recovery generations from disk */ + status = ocfs2_check_journals_nolocks(osb); + status = (status == -EROFS) ? 0 : status; + if (status < 0) + mlog_errno(status); + + /* Now it is right time to recover quotas... We have to do this under + * superblock lock so that no one can start using the slot (and crash) + * before we recover it */ + for (i = 0; i < rm_quota_used; i++) { + qrec = ocfs2_begin_quota_recovery(osb, rm_quota[i]); + if (IS_ERR(qrec)) { + status = PTR_ERR(qrec); + mlog_errno(status); + continue; + } + ocfs2_queue_recovery_completion(osb->journal, rm_quota[i], + NULL, NULL, qrec); } + ocfs2_super_unlock(osb, 1); - /* We always run recovery on our own orphan dir - the dead - * node(s) may have disallowd a previos inode delete. Re-processing - * is therefore required. */ - ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL, - NULL); + /* queue recovery for offline slots */ + ocfs2_queue_replay_slots(osb); bail: mutex_lock(&osb->recovery_lock); - if (!status && - !ocfs2_node_map_is_empty(osb, &osb->recovery_map)) { + if (!status && !ocfs2_recovery_completed(osb)) { mutex_unlock(&osb->recovery_lock); goto restart; } + ocfs2_free_replay_slots(osb); osb->recovery_thread_task = NULL; mb(); /* sync with ocfs2_recovery_thread_running */ wake_up(&osb->recovery_event); mutex_unlock(&osb->recovery_lock); - mlog_exit(status); + kfree(rm_quota); + /* no one is callint kthread_stop() for us so the kthread() api * requires that we call do_exit(). And it isn't exported, but * complete_and_exit() seems to be a minimal wrapper around it. */ @@ -915,19 +1452,15 @@ bail: void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num) { - mlog_entry("(node_num=%d, osb->node_num = %d)\n", - node_num, osb->node_num); - mutex_lock(&osb->recovery_lock); - if (osb->disable_recovery) - goto out; - /* People waiting on recovery will wait on - * the recovery map to empty. */ - if (!ocfs2_recovery_map_set(osb, node_num)) - mlog(0, "node %d already be in recovery.\n", node_num); + trace_ocfs2_recovery_thread(node_num, osb->node_num, + osb->disable_recovery, osb->recovery_thread_task, + osb->disable_recovery ? + -1 : ocfs2_recovery_map_set(osb, node_num)); - mlog(0, "starting recovery thread...\n"); + if (osb->disable_recovery) + goto out; if (osb->recovery_thread_task) goto out; @@ -942,8 +1475,42 @@ void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num) out: mutex_unlock(&osb->recovery_lock); wake_up(&osb->recovery_event); +} + +static int ocfs2_read_journal_inode(struct ocfs2_super *osb, + int slot_num, + struct buffer_head **bh, + struct inode **ret_inode) +{ + int status = -EACCES; + struct inode *inode = NULL; + + BUG_ON(slot_num >= osb->max_slots); + + inode = ocfs2_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE, + slot_num); + if (!inode || is_bad_inode(inode)) { + mlog_errno(status); + goto bail; + } + SET_INODE_JOURNAL(inode); + + status = ocfs2_read_inode_block_full(inode, bh, OCFS2_BH_IGNORE_CACHE); + if (status < 0) { + mlog_errno(status); + goto bail; + } + + status = 0; - mlog_exit_void(); +bail: + if (inode) { + if (status || !ret_inode) + iput(inode); + else + *ret_inode = inode; + } + return status; } /* Does the actual journal replay and marks the journal inode as @@ -959,26 +1526,40 @@ static int ocfs2_replay_journal(struct ocfs2_super *osb, struct ocfs2_dinode *fe; journal_t *journal = NULL; struct buffer_head *bh = NULL; + u32 slot_reco_gen; - inode = ocfs2_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE, - slot_num); - if (inode == NULL) { - status = -EACCES; + status = ocfs2_read_journal_inode(osb, slot_num, &bh, &inode); + if (status) { mlog_errno(status); goto done; } - if (is_bad_inode(inode)) { - status = -EACCES; - iput(inode); - inode = NULL; - mlog_errno(status); + + fe = (struct ocfs2_dinode *)bh->b_data; + slot_reco_gen = ocfs2_get_recovery_generation(fe); + brelse(bh); + bh = NULL; + + /* + * As the fs recovery is asynchronous, there is a small chance that + * another node mounted (and recovered) the slot before the recovery + * thread could get the lock. To handle that, we dirty read the journal + * inode for that slot to get the recovery generation. If it is + * different than what we expected, the slot has been recovered. + * If not, it needs recovery. + */ + if (osb->slot_recovery_generations[slot_num] != slot_reco_gen) { + trace_ocfs2_replay_journal_recovered(slot_num, + osb->slot_recovery_generations[slot_num], slot_reco_gen); + osb->slot_recovery_generations[slot_num] = slot_reco_gen; + status = -EBUSY; goto done; } - SET_INODE_JOURNAL(inode); + + /* Continue with recovery as the journal has not yet been recovered */ status = ocfs2_inode_lock_full(inode, &bh, 1, OCFS2_META_LOCK_RECOVERY); if (status < 0) { - mlog(0, "status returned from ocfs2_inode_lock=%d\n", status); + trace_ocfs2_replay_journal_lock_err(status); if (status != -ERESTARTSYS) mlog(ML_ERROR, "Could not lock journal!\n"); goto done; @@ -988,15 +1569,21 @@ static int ocfs2_replay_journal(struct ocfs2_super *osb, fe = (struct ocfs2_dinode *) bh->b_data; flags = le32_to_cpu(fe->id1.journal1.ij_flags); + slot_reco_gen = ocfs2_get_recovery_generation(fe); if (!(flags & OCFS2_JOURNAL_DIRTY_FL)) { - mlog(0, "No recovery required for node %d\n", node_num); + trace_ocfs2_replay_journal_skip(node_num); + /* Refresh recovery generation for the slot */ + osb->slot_recovery_generations[slot_num] = slot_reco_gen; goto done; } - mlog(ML_NOTICE, "Recovering node %d from slot %d on device (%u,%u)\n", - node_num, slot_num, - MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev)); + /* we need to run complete recovery for offline orphan slots */ + ocfs2_replay_map_set_state(osb, REPLAY_NEEDED); + + printk(KERN_NOTICE "ocfs2: Begin replay journal (node %d, slot %d) on "\ + "device (%u,%u)\n", node_num, slot_num, MAJOR(osb->sb->s_dev), + MINOR(osb->sb->s_dev)); OCFS2_I(inode)->ip_clusters = le32_to_cpu(fe->i_clusters); @@ -1006,30 +1593,28 @@ static int ocfs2_replay_journal(struct ocfs2_super *osb, goto done; } - mlog(0, "calling journal_init_inode\n"); - journal = journal_init_inode(inode); + journal = jbd2_journal_init_inode(inode); if (journal == NULL) { mlog(ML_ERROR, "Linux journal layer error\n"); status = -EIO; goto done; } - status = journal_load(journal); + status = jbd2_journal_load(journal); if (status < 0) { mlog_errno(status); if (!igrab(inode)) BUG(); - journal_destroy(journal); + jbd2_journal_destroy(journal); goto done; } ocfs2_clear_journal_error(osb->sb, journal, slot_num); /* wipe the journal */ - mlog(0, "flushing the journal.\n"); - journal_lock_updates(journal); - status = journal_flush(journal); - journal_unlock_updates(journal); + jbd2_journal_lock_updates(journal); + status = jbd2_journal_flush(journal); + jbd2_journal_unlock_updates(journal); if (status < 0) mlog_errno(status); @@ -1038,15 +1623,24 @@ static int ocfs2_replay_journal(struct ocfs2_super *osb, flags &= ~OCFS2_JOURNAL_DIRTY_FL; fe->id1.journal1.ij_flags = cpu_to_le32(flags); - status = ocfs2_write_block(osb, bh, inode); + /* Increment recovery generation to indicate successful recovery */ + ocfs2_bump_recovery_generation(fe); + osb->slot_recovery_generations[slot_num] = + ocfs2_get_recovery_generation(fe); + + ocfs2_compute_meta_ecc(osb->sb, bh->b_data, &fe->i_check); + status = ocfs2_write_block(osb, bh, INODE_CACHE(inode)); if (status < 0) mlog_errno(status); if (!igrab(inode)) BUG(); - journal_destroy(journal); + jbd2_journal_destroy(journal); + printk(KERN_NOTICE "ocfs2: End replay journal (node %d, slot %d) on "\ + "device (%u,%u)\n", node_num, slot_num, MAJOR(osb->sb->s_dev), + MINOR(osb->sb->s_dev)); done: /* drop the lock on this nodes journal */ if (got_lock) @@ -1055,10 +1649,8 @@ done: if (inode) iput(inode); - if (bh) - brelse(bh); + brelse(bh); - mlog_exit(status); return status; } @@ -1075,34 +1667,25 @@ done: * far less concerning. */ static int ocfs2_recover_node(struct ocfs2_super *osb, - int node_num) + int node_num, int slot_num) { int status = 0; - int slot_num; - struct ocfs2_slot_info *si = osb->slot_info; struct ocfs2_dinode *la_copy = NULL; struct ocfs2_dinode *tl_copy = NULL; - mlog_entry("(node_num=%d, osb->node_num = %d)\n", - node_num, osb->node_num); - - mlog(0, "checking node %d\n", node_num); + trace_ocfs2_recover_node(node_num, slot_num, osb->node_num); /* Should not ever be called to recover ourselves -- in that * case we should've called ocfs2_journal_load instead. */ BUG_ON(osb->node_num == node_num); - slot_num = ocfs2_node_num_to_slot(si, node_num); - if (slot_num == OCFS2_INVALID_SLOT) { - status = 0; - mlog(0, "no slot for this node, so no recovery required.\n"); - goto done; - } - - mlog(0, "node %d was using slot %d\n", node_num, slot_num); - status = ocfs2_replay_journal(osb, node_num, slot_num); if (status < 0) { + if (status == -EBUSY) { + trace_ocfs2_recover_node_skip(slot_num, node_num); + status = 0; + goto done; + } mlog_errno(status); goto done; } @@ -1123,19 +1706,17 @@ static int ocfs2_recover_node(struct ocfs2_super *osb, /* Likewise, this would be a strange but ultimately not so * harmful place to get an error... */ - ocfs2_clear_slot(si, slot_num); - status = ocfs2_update_disk_slots(osb, si); + status = ocfs2_clear_slot(osb, slot_num); if (status < 0) mlog_errno(status); /* This will kfree the memory pointed to by la_copy and tl_copy */ ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy, - tl_copy); + tl_copy, NULL); status = 0; done: - mlog_exit(status); return status; } @@ -1184,23 +1765,49 @@ bail: * slot info struct has been updated from disk. */ int ocfs2_mark_dead_nodes(struct ocfs2_super *osb) { - int status, i, node_num; - struct ocfs2_slot_info *si = osb->slot_info; + unsigned int node_num; + int status, i; + u32 gen; + struct buffer_head *bh = NULL; + struct ocfs2_dinode *di; /* This is called with the super block cluster lock, so we * know that the slot map can't change underneath us. */ - spin_lock(&si->si_lock); - for(i = 0; i < si->si_num_slots; i++) { - if (i == osb->slot_num) + for (i = 0; i < osb->max_slots; i++) { + /* Read journal inode to get the recovery generation */ + status = ocfs2_read_journal_inode(osb, i, &bh, NULL); + if (status) { + mlog_errno(status); + goto bail; + } + di = (struct ocfs2_dinode *)bh->b_data; + gen = ocfs2_get_recovery_generation(di); + brelse(bh); + bh = NULL; + + spin_lock(&osb->osb_lock); + osb->slot_recovery_generations[i] = gen; + + trace_ocfs2_mark_dead_nodes(i, + osb->slot_recovery_generations[i]); + + if (i == osb->slot_num) { + spin_unlock(&osb->osb_lock); continue; - if (ocfs2_is_empty_slot(si, i)) + } + + status = ocfs2_slot_to_node_num_locked(osb, i, &node_num); + if (status == -ENOENT) { + spin_unlock(&osb->osb_lock); continue; + } - node_num = si->si_global_node_nums[i]; - if (ocfs2_node_map_test_bit(osb, &osb->recovery_map, node_num)) + if (__ocfs2_recovery_map_test(osb, node_num)) { + spin_unlock(&osb->osb_lock); continue; - spin_unlock(&si->si_lock); + } + spin_unlock(&osb->osb_lock); /* Ok, we have a slot occupied by another node which * is not in the recovery map. We trylock his journal @@ -1215,18 +1822,162 @@ int ocfs2_mark_dead_nodes(struct ocfs2_super *osb) mlog_errno(status); goto bail; } - - spin_lock(&si->si_lock); } - spin_unlock(&si->si_lock); status = 0; bail: - mlog_exit(status); return status; } +/* + * Scan timer should get fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT. Add some + * randomness to the timeout to minimize multple nodes firing the timer at the + * same time. + */ +static inline unsigned long ocfs2_orphan_scan_timeout(void) +{ + unsigned long time; + + get_random_bytes(&time, sizeof(time)); + time = ORPHAN_SCAN_SCHEDULE_TIMEOUT + (time % 5000); + return msecs_to_jiffies(time); +} + +/* + * ocfs2_queue_orphan_scan calls ocfs2_queue_recovery_completion for + * every slot, queuing a recovery of the slot on the ocfs2_wq thread. This + * is done to catch any orphans that are left over in orphan directories. + * + * It scans all slots, even ones that are in use. It does so to handle the + * case described below: + * + * Node 1 has an inode it was using. The dentry went away due to memory + * pressure. Node 1 closes the inode, but it's on the free list. The node + * has the open lock. + * Node 2 unlinks the inode. It grabs the dentry lock to notify others, + * but node 1 has no dentry and doesn't get the message. It trylocks the + * open lock, sees that another node has a PR, and does nothing. + * Later node 2 runs its orphan dir. It igets the inode, trylocks the + * open lock, sees the PR still, and does nothing. + * Basically, we have to trigger an orphan iput on node 1. The only way + * for this to happen is if node 1 runs node 2's orphan dir. + * + * ocfs2_queue_orphan_scan gets called every ORPHAN_SCAN_SCHEDULE_TIMEOUT + * seconds. It gets an EX lock on os_lockres and checks sequence number + * stored in LVB. If the sequence number has changed, it means some other + * node has done the scan. This node skips the scan and tracks the + * sequence number. If the sequence number didn't change, it means a scan + * hasn't happened. The node queues a scan and increments the + * sequence number in the LVB. + */ +void ocfs2_queue_orphan_scan(struct ocfs2_super *osb) +{ + struct ocfs2_orphan_scan *os; + int status, i; + u32 seqno = 0; + + os = &osb->osb_orphan_scan; + + if (atomic_read(&os->os_state) == ORPHAN_SCAN_INACTIVE) + goto out; + + trace_ocfs2_queue_orphan_scan_begin(os->os_count, os->os_seqno, + atomic_read(&os->os_state)); + + status = ocfs2_orphan_scan_lock(osb, &seqno); + if (status < 0) { + if (status != -EAGAIN) + mlog_errno(status); + goto out; + } + + /* Do no queue the tasks if the volume is being umounted */ + if (atomic_read(&os->os_state) == ORPHAN_SCAN_INACTIVE) + goto unlock; + + if (os->os_seqno != seqno) { + os->os_seqno = seqno; + goto unlock; + } + + for (i = 0; i < osb->max_slots; i++) + ocfs2_queue_recovery_completion(osb->journal, i, NULL, NULL, + NULL); + /* + * We queued a recovery on orphan slots, increment the sequence + * number and update LVB so other node will skip the scan for a while + */ + seqno++; + os->os_count++; + os->os_scantime = CURRENT_TIME; +unlock: + ocfs2_orphan_scan_unlock(osb, seqno); +out: + trace_ocfs2_queue_orphan_scan_end(os->os_count, os->os_seqno, + atomic_read(&os->os_state)); + return; +} + +/* Worker task that gets fired every ORPHAN_SCAN_SCHEDULE_TIMEOUT millsec */ +void ocfs2_orphan_scan_work(struct work_struct *work) +{ + struct ocfs2_orphan_scan *os; + struct ocfs2_super *osb; + + os = container_of(work, struct ocfs2_orphan_scan, + os_orphan_scan_work.work); + osb = os->os_osb; + + mutex_lock(&os->os_lock); + ocfs2_queue_orphan_scan(osb); + if (atomic_read(&os->os_state) == ORPHAN_SCAN_ACTIVE) + queue_delayed_work(ocfs2_wq, &os->os_orphan_scan_work, + ocfs2_orphan_scan_timeout()); + mutex_unlock(&os->os_lock); +} + +void ocfs2_orphan_scan_stop(struct ocfs2_super *osb) +{ + struct ocfs2_orphan_scan *os; + + os = &osb->osb_orphan_scan; + if (atomic_read(&os->os_state) == ORPHAN_SCAN_ACTIVE) { + atomic_set(&os->os_state, ORPHAN_SCAN_INACTIVE); + mutex_lock(&os->os_lock); + cancel_delayed_work(&os->os_orphan_scan_work); + mutex_unlock(&os->os_lock); + } +} + +void ocfs2_orphan_scan_init(struct ocfs2_super *osb) +{ + struct ocfs2_orphan_scan *os; + + os = &osb->osb_orphan_scan; + os->os_osb = osb; + os->os_count = 0; + os->os_seqno = 0; + mutex_init(&os->os_lock); + INIT_DELAYED_WORK(&os->os_orphan_scan_work, ocfs2_orphan_scan_work); +} + +void ocfs2_orphan_scan_start(struct ocfs2_super *osb) +{ + struct ocfs2_orphan_scan *os; + + os = &osb->osb_orphan_scan; + os->os_scantime = CURRENT_TIME; + if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb)) + atomic_set(&os->os_state, ORPHAN_SCAN_INACTIVE); + else { + atomic_set(&os->os_state, ORPHAN_SCAN_ACTIVE); + queue_delayed_work(ocfs2_wq, &os->os_orphan_scan_work, + ocfs2_orphan_scan_timeout()); + } +} + struct ocfs2_orphan_filldir_priv { + struct dir_context ctx; struct inode *head; struct ocfs2_super *osb; }; @@ -1248,8 +1999,7 @@ static int ocfs2_orphan_filldir(void *priv, const char *name, int name_len, if (IS_ERR(iter)) return 0; - mlog(0, "queue orphan %llu\n", - (unsigned long long)OCFS2_I(iter)->ip_blkno); + trace_ocfs2_orphan_filldir((unsigned long long)OCFS2_I(iter)->ip_blkno); /* No locking is required for the next_orphan queue as there * is only ever a single process doing orphan recovery. */ OCFS2_I(iter)->ip_next_orphan = p->head; @@ -1264,11 +2014,11 @@ static int ocfs2_queue_orphans(struct ocfs2_super *osb, { int status; struct inode *orphan_dir_inode = NULL; - struct ocfs2_orphan_filldir_priv priv; - loff_t pos = 0; - - priv.osb = osb; - priv.head = *head; + struct ocfs2_orphan_filldir_priv priv = { + .ctx.actor = ocfs2_orphan_filldir, + .osb = osb, + .head = *head + }; orphan_dir_inode = ocfs2_get_system_file_inode(osb, ORPHAN_DIR_SYSTEM_INODE, @@ -1277,7 +2027,7 @@ static int ocfs2_queue_orphans(struct ocfs2_super *osb, status = -ENOENT; mlog_errno(status); return status; - } + } mutex_lock(&orphan_dir_inode->i_mutex); status = ocfs2_inode_lock(orphan_dir_inode, NULL, 0); @@ -1286,8 +2036,7 @@ static int ocfs2_queue_orphans(struct ocfs2_super *osb, goto out; } - status = ocfs2_dir_foreach(orphan_dir_inode, &pos, &priv, - ocfs2_orphan_filldir); + status = ocfs2_dir_foreach(orphan_dir_inode, &priv.ctx); if (status) { mlog_errno(status); goto out_cluster; @@ -1365,7 +2114,7 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb, struct inode *iter; struct ocfs2_inode_info *oi; - mlog(0, "Recover inodes from orphan dir in slot %d\n", slot); + trace_ocfs2_recover_orphans(slot); ocfs2_mark_recovering_orphan_dir(osb, slot); ret = ocfs2_queue_orphans(osb, slot, &inode); @@ -1378,17 +2127,12 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb, while (inode) { oi = OCFS2_I(inode); - mlog(0, "iput orphan %llu\n", (unsigned long long)oi->ip_blkno); + trace_ocfs2_recover_orphans_iput( + (unsigned long long)oi->ip_blkno); iter = oi->ip_next_orphan; spin_lock(&oi->ip_lock); - /* The remote delete code may have set these on the - * assumption that the other node would wipe them - * successfully. If they are still in the node's - * orphan dir, we need to reset that state. */ - oi->ip_flags &= ~(OCFS2_INODE_DELETED|OCFS2_INODE_SKIP_DELETE); - /* Set the proper information to get us going into * ocfs2_delete_inode. */ oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; @@ -1402,19 +2146,21 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb, return ret; } -static int ocfs2_wait_on_mount(struct ocfs2_super *osb) +static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota) { /* This check is good because ocfs2 will wait on our recovery * thread before changing it to something other than MOUNTED * or DISABLED. */ wait_event(osb->osb_mount_event, - atomic_read(&osb->vol_state) == VOLUME_MOUNTED || + (!quota && atomic_read(&osb->vol_state) == VOLUME_MOUNTED) || + atomic_read(&osb->vol_state) == VOLUME_MOUNTED_QUOTAS || atomic_read(&osb->vol_state) == VOLUME_DISABLED); /* If there's an error on mount, then we may never get to the * MOUNTED flag, but this is set right before * dismount_volume() so we can trust it. */ if (atomic_read(&osb->vol_state) == VOLUME_DISABLED) { + trace_ocfs2_wait_on_mount(VOLUME_DISABLED); mlog(0, "mount error, exiting!\n"); return -EBUSY; } @@ -1440,8 +2186,20 @@ static int ocfs2_commit_thread(void *arg) || kthread_should_stop()); status = ocfs2_commit_cache(osb); - if (status < 0) - mlog_errno(status); + if (status < 0) { + static unsigned long abort_warn_time; + + /* Warn about this once per minute */ + if (printk_timed_ratelimit(&abort_warn_time, 60*HZ)) + mlog(ML_ERROR, "status = %d, journal is " + "already aborted.\n", status); + /* + * After ocfs2_commit_cache() fails, j_num_trans has a + * non-zero value. Sleep here to avoid a busy-wait + * loop. + */ + msleep_interruptible(1000); + } if (kthread_should_stop() && atomic_read(&journal->j_num_trans)){ mlog(ML_KTHREAD, @@ -1454,49 +2212,41 @@ static int ocfs2_commit_thread(void *arg) return 0; } -/* Look for a dirty journal without taking any cluster locks. Used for - * hard readonly access to determine whether the file system journals - * require recovery. */ +/* Reads all the journal inodes without taking any cluster locks. Used + * for hard readonly access to determine whether any journal requires + * recovery. Also used to refresh the recovery generation numbers after + * a journal has been recovered by another node. + */ int ocfs2_check_journals_nolocks(struct ocfs2_super *osb) { int ret = 0; unsigned int slot; - struct buffer_head *di_bh; + struct buffer_head *di_bh = NULL; struct ocfs2_dinode *di; - struct inode *journal = NULL; + int journal_dirty = 0; for(slot = 0; slot < osb->max_slots; slot++) { - journal = ocfs2_get_system_file_inode(osb, - JOURNAL_SYSTEM_INODE, - slot); - if (!journal || is_bad_inode(journal)) { - ret = -EACCES; - mlog_errno(ret); - goto out; - } - - di_bh = NULL; - ret = ocfs2_read_block(osb, OCFS2_I(journal)->ip_blkno, &di_bh, - 0, journal); - if (ret < 0) { + ret = ocfs2_read_journal_inode(osb, slot, &di_bh, NULL); + if (ret) { mlog_errno(ret); goto out; } di = (struct ocfs2_dinode *) di_bh->b_data; + osb->slot_recovery_generations[slot] = + ocfs2_get_recovery_generation(di); + if (le32_to_cpu(di->id1.journal1.ij_flags) & OCFS2_JOURNAL_DIRTY_FL) - ret = -EROFS; + journal_dirty = 1; brelse(di_bh); - if (ret) - break; + di_bh = NULL; } out: - if (journal) - iput(journal); - + if (journal_dirty) + ret = -EROFS; return ret; } |
