diff options
Diffstat (limited to 'fs/btrfs/locking.c')
| -rw-r--r-- | fs/btrfs/locking.c | 361 | 
1 files changed, 205 insertions, 156 deletions
diff --git a/fs/btrfs/locking.c b/fs/btrfs/locking.c index 6151f2ea38b..5665d214924 100644 --- a/fs/btrfs/locking.c +++ b/fs/btrfs/locking.c @@ -24,210 +24,259 @@  #include "extent_io.h"  #include "locking.h" -static inline void spin_nested(struct extent_buffer *eb) -{ -	spin_lock(&eb->lock); -} +static void btrfs_assert_tree_read_locked(struct extent_buffer *eb);  /* - * Setting a lock to blocking will drop the spinlock and set the - * flag that forces other procs who want the lock to wait.  After - * this you can safely schedule with the lock held. + * if we currently have a spinning reader or writer lock + * (indicated by the rw flag) this will bump the count + * of blocking holders and drop the spinlock.   */ -void btrfs_set_lock_blocking(struct extent_buffer *eb) +void btrfs_set_lock_blocking_rw(struct extent_buffer *eb, int rw)  { -	if (!test_bit(EXTENT_BUFFER_BLOCKING, &eb->bflags)) { -		set_bit(EXTENT_BUFFER_BLOCKING, &eb->bflags); -		spin_unlock(&eb->lock); +	/* +	 * no lock is required.  The lock owner may change if +	 * we have a read lock, but it won't change to or away +	 * from us.  If we have the write lock, we are the owner +	 * and it'll never change. +	 */ +	if (eb->lock_nested && current->pid == eb->lock_owner) +		return; +	if (rw == BTRFS_WRITE_LOCK) { +		if (atomic_read(&eb->blocking_writers) == 0) { +			WARN_ON(atomic_read(&eb->spinning_writers) != 1); +			atomic_dec(&eb->spinning_writers); +			btrfs_assert_tree_locked(eb); +			atomic_inc(&eb->blocking_writers); +			write_unlock(&eb->lock); +		} +	} else if (rw == BTRFS_READ_LOCK) { +		btrfs_assert_tree_read_locked(eb); +		atomic_inc(&eb->blocking_readers); +		WARN_ON(atomic_read(&eb->spinning_readers) == 0); +		atomic_dec(&eb->spinning_readers); +		read_unlock(&eb->lock);  	} -	/* exit with the spin lock released and the bit set */ +	return;  }  /* - * clearing the blocking flag will take the spinlock again. - * After this you can't safely schedule + * if we currently have a blocking lock, take the spinlock + * and drop our blocking count   */ -void btrfs_clear_lock_blocking(struct extent_buffer *eb) +void btrfs_clear_lock_blocking_rw(struct extent_buffer *eb, int rw)  { -	if (test_bit(EXTENT_BUFFER_BLOCKING, &eb->bflags)) { -		spin_nested(eb); -		clear_bit(EXTENT_BUFFER_BLOCKING, &eb->bflags); -		smp_mb__after_clear_bit(); +	/* +	 * no lock is required.  The lock owner may change if +	 * we have a read lock, but it won't change to or away +	 * from us.  If we have the write lock, we are the owner +	 * and it'll never change. +	 */ +	if (eb->lock_nested && current->pid == eb->lock_owner) +		return; + +	if (rw == BTRFS_WRITE_LOCK_BLOCKING) { +		BUG_ON(atomic_read(&eb->blocking_writers) != 1); +		write_lock(&eb->lock); +		WARN_ON(atomic_read(&eb->spinning_writers)); +		atomic_inc(&eb->spinning_writers); +		if (atomic_dec_and_test(&eb->blocking_writers) && +		    waitqueue_active(&eb->write_lock_wq)) +			wake_up(&eb->write_lock_wq); +	} else if (rw == BTRFS_READ_LOCK_BLOCKING) { +		BUG_ON(atomic_read(&eb->blocking_readers) == 0); +		read_lock(&eb->lock); +		atomic_inc(&eb->spinning_readers); +		if (atomic_dec_and_test(&eb->blocking_readers) && +		    waitqueue_active(&eb->read_lock_wq)) +			wake_up(&eb->read_lock_wq);  	} -	/* exit with the spin lock held */ +	return;  }  /* - * unfortunately, many of the places that currently set a lock to blocking - * don't end up blocking for very long, and often they don't block - * at all.  For a dbench 50 run, if we don't spin on the blocking bit - * at all, the context switch rate can jump up to 400,000/sec or more. - * - * So, we're still stuck with this crummy spin on the blocking bit, - * at least until the most common causes of the short blocks - * can be dealt with. + * take a spinning read lock.  This will wait for any blocking + * writers   */ -static int btrfs_spin_on_block(struct extent_buffer *eb) +void btrfs_tree_read_lock(struct extent_buffer *eb)  { -	int i; - -	for (i = 0; i < 512; i++) { -		if (!test_bit(EXTENT_BUFFER_BLOCKING, &eb->bflags)) -			return 1; -		if (need_resched()) -			break; -		cpu_relax(); +again: +	BUG_ON(!atomic_read(&eb->blocking_writers) && +	       current->pid == eb->lock_owner); + +	read_lock(&eb->lock); +	if (atomic_read(&eb->blocking_writers) && +	    current->pid == eb->lock_owner) { +		/* +		 * This extent is already write-locked by our thread. We allow +		 * an additional read lock to be added because it's for the same +		 * thread. btrfs_find_all_roots() depends on this as it may be +		 * called on a partly (write-)locked tree. +		 */ +		BUG_ON(eb->lock_nested); +		eb->lock_nested = 1; +		read_unlock(&eb->lock); +		return;  	} -	return 0; +	if (atomic_read(&eb->blocking_writers)) { +		read_unlock(&eb->lock); +		wait_event(eb->write_lock_wq, +			   atomic_read(&eb->blocking_writers) == 0); +		goto again; +	} +	atomic_inc(&eb->read_locks); +	atomic_inc(&eb->spinning_readers);  }  /* - * This is somewhat different from trylock.  It will take the - * spinlock but if it finds the lock is set to blocking, it will - * return without the lock held. - * - * returns 1 if it was able to take the lock and zero otherwise - * - * After this call, scheduling is not safe without first calling - * btrfs_set_lock_blocking() + * returns 1 if we get the read lock and 0 if we don't + * this won't wait for blocking writers   */ -int btrfs_try_spin_lock(struct extent_buffer *eb) +int btrfs_try_tree_read_lock(struct extent_buffer *eb)  { -	int i; +	if (atomic_read(&eb->blocking_writers)) +		return 0; -	if (btrfs_spin_on_block(eb)) { -		spin_nested(eb); -		if (!test_bit(EXTENT_BUFFER_BLOCKING, &eb->bflags)) -			return 1; -		spin_unlock(&eb->lock); -	} -	/* spin for a bit on the BLOCKING flag */ -	for (i = 0; i < 2; i++) { -		cpu_relax(); -		if (!btrfs_spin_on_block(eb)) -			break; - -		spin_nested(eb); -		if (!test_bit(EXTENT_BUFFER_BLOCKING, &eb->bflags)) -			return 1; -		spin_unlock(&eb->lock); +	if (!read_trylock(&eb->lock)) +		return 0; + +	if (atomic_read(&eb->blocking_writers)) { +		read_unlock(&eb->lock); +		return 0;  	} -	return 0; +	atomic_inc(&eb->read_locks); +	atomic_inc(&eb->spinning_readers); +	return 1;  }  /* - * the autoremove wake function will return 0 if it tried to wake up - * a process that was already awake, which means that process won't - * count as an exclusive wakeup.  The waitq code will continue waking - * procs until it finds one that was actually sleeping. - * - * For btrfs, this isn't quite what we want.  We want a single proc - * to be notified that the lock is ready for taking.  If that proc - * already happen to be awake, great, it will loop around and try for - * the lock. - * - * So, btrfs_wake_function always returns 1, even when the proc that we - * tried to wake up was already awake. + * returns 1 if we get the read lock and 0 if we don't + * this won't wait for blocking writers or readers   */ -static int btrfs_wake_function(wait_queue_t *wait, unsigned mode, -			       int sync, void *key) +int btrfs_try_tree_write_lock(struct extent_buffer *eb)  { -	autoremove_wake_function(wait, mode, sync, key); +	if (atomic_read(&eb->blocking_writers) || +	    atomic_read(&eb->blocking_readers)) +		return 0; + +	if (!write_trylock(&eb->lock)) +		return 0; + +	if (atomic_read(&eb->blocking_writers) || +	    atomic_read(&eb->blocking_readers)) { +		write_unlock(&eb->lock); +		return 0; +	} +	atomic_inc(&eb->write_locks); +	atomic_inc(&eb->spinning_writers); +	eb->lock_owner = current->pid;  	return 1;  }  /* - * returns with the extent buffer spinlocked. - * - * This will spin and/or wait as required to take the lock, and then - * return with the spinlock held. - * - * After this call, scheduling is not safe without first calling - * btrfs_set_lock_blocking() + * drop a spinning read lock   */ -int btrfs_tree_lock(struct extent_buffer *eb) +void btrfs_tree_read_unlock(struct extent_buffer *eb)  { -	DEFINE_WAIT(wait); -	wait.func = btrfs_wake_function; - -	if (!btrfs_spin_on_block(eb)) -		goto sleep; - -	while(1) { -		spin_nested(eb); - -		/* nobody is blocking, exit with the spinlock held */ -		if (!test_bit(EXTENT_BUFFER_BLOCKING, &eb->bflags)) -			return 0; - -		/* -		 * we have the spinlock, but the real owner is blocking. -		 * wait for them -		 */ -		spin_unlock(&eb->lock); - -		/* -		 * spin for a bit, and if the blocking flag goes away, -		 * loop around -		 */ -		cpu_relax(); -		if (btrfs_spin_on_block(eb)) -			continue; -sleep: -		prepare_to_wait_exclusive(&eb->lock_wq, &wait, -					  TASK_UNINTERRUPTIBLE); - -		if (test_bit(EXTENT_BUFFER_BLOCKING, &eb->bflags)) -			schedule(); +	/* +	 * if we're nested, we have the write lock.  No new locking +	 * is needed as long as we are the lock owner. +	 * The write unlock will do a barrier for us, and the lock_nested +	 * field only matters to the lock owner. +	 */ +	if (eb->lock_nested && current->pid == eb->lock_owner) { +		eb->lock_nested = 0; +		return; +	} +	btrfs_assert_tree_read_locked(eb); +	WARN_ON(atomic_read(&eb->spinning_readers) == 0); +	atomic_dec(&eb->spinning_readers); +	atomic_dec(&eb->read_locks); +	read_unlock(&eb->lock); +} -		finish_wait(&eb->lock_wq, &wait); +/* + * drop a blocking read lock + */ +void btrfs_tree_read_unlock_blocking(struct extent_buffer *eb) +{ +	/* +	 * if we're nested, we have the write lock.  No new locking +	 * is needed as long as we are the lock owner. +	 * The write unlock will do a barrier for us, and the lock_nested +	 * field only matters to the lock owner. +	 */ +	if (eb->lock_nested && current->pid == eb->lock_owner) { +		eb->lock_nested = 0; +		return;  	} -	return 0; +	btrfs_assert_tree_read_locked(eb); +	WARN_ON(atomic_read(&eb->blocking_readers) == 0); +	if (atomic_dec_and_test(&eb->blocking_readers) && +	    waitqueue_active(&eb->read_lock_wq)) +		wake_up(&eb->read_lock_wq); +	atomic_dec(&eb->read_locks);  }  /* - * Very quick trylock, this does not spin or schedule.  It returns - * 1 with the spinlock held if it was able to take the lock, or it - * returns zero if it was unable to take the lock. - * - * After this call, scheduling is not safe without first calling - * btrfs_set_lock_blocking() + * take a spinning write lock.  This will wait for both + * blocking readers or writers   */ -int btrfs_try_tree_lock(struct extent_buffer *eb) +void btrfs_tree_lock(struct extent_buffer *eb)  { -	if (spin_trylock(&eb->lock)) { -		if (test_bit(EXTENT_BUFFER_BLOCKING, &eb->bflags)) { -			/* -			 * we've got the spinlock, but the real owner is -			 * blocking.  Drop the spinlock and return failure -			 */ -			spin_unlock(&eb->lock); -			return 0; -		} -		return 1; +again: +	wait_event(eb->read_lock_wq, atomic_read(&eb->blocking_readers) == 0); +	wait_event(eb->write_lock_wq, atomic_read(&eb->blocking_writers) == 0); +	write_lock(&eb->lock); +	if (atomic_read(&eb->blocking_readers)) { +		write_unlock(&eb->lock); +		wait_event(eb->read_lock_wq, +			   atomic_read(&eb->blocking_readers) == 0); +		goto again;  	} -	/* someone else has the spinlock giveup */ -	return 0; +	if (atomic_read(&eb->blocking_writers)) { +		write_unlock(&eb->lock); +		wait_event(eb->write_lock_wq, +			   atomic_read(&eb->blocking_writers) == 0); +		goto again; +	} +	WARN_ON(atomic_read(&eb->spinning_writers)); +	atomic_inc(&eb->spinning_writers); +	atomic_inc(&eb->write_locks); +	eb->lock_owner = current->pid;  } -int btrfs_tree_unlock(struct extent_buffer *eb) +/* + * drop a spinning or a blocking write lock. + */ +void btrfs_tree_unlock(struct extent_buffer *eb)  { -	/* -	 * if we were a blocking owner, we don't have the spinlock held -	 * just clear the bit and look for waiters -	 */ -	if (test_and_clear_bit(EXTENT_BUFFER_BLOCKING, &eb->bflags)) -		smp_mb__after_clear_bit(); -	else -		spin_unlock(&eb->lock); - -	if (waitqueue_active(&eb->lock_wq)) -		wake_up(&eb->lock_wq); -	return 0; +	int blockers = atomic_read(&eb->blocking_writers); + +	BUG_ON(blockers > 1); + +	btrfs_assert_tree_locked(eb); +	eb->lock_owner = 0; +	atomic_dec(&eb->write_locks); + +	if (blockers) { +		WARN_ON(atomic_read(&eb->spinning_writers)); +		atomic_dec(&eb->blocking_writers); +		smp_mb(); +		if (waitqueue_active(&eb->write_lock_wq)) +			wake_up(&eb->write_lock_wq); +	} else { +		WARN_ON(atomic_read(&eb->spinning_writers) != 1); +		atomic_dec(&eb->spinning_writers); +		write_unlock(&eb->lock); +	}  }  void btrfs_assert_tree_locked(struct extent_buffer *eb)  { -	if (!test_bit(EXTENT_BUFFER_BLOCKING, &eb->bflags)) -		assert_spin_locked(&eb->lock); +	BUG_ON(!atomic_read(&eb->write_locks)); +} + +static void btrfs_assert_tree_read_locked(struct extent_buffer *eb) +{ +	BUG_ON(!atomic_read(&eb->read_locks));  }  | 
