diff options
Diffstat (limited to 'fs/btrfs/async-thread.c')
| -rw-r--r-- | fs/btrfs/async-thread.c | 254 | 
1 files changed, 202 insertions, 52 deletions
diff --git a/fs/btrfs/async-thread.c b/fs/btrfs/async-thread.c index 019e8af449a..282ca085c2f 100644 --- a/fs/btrfs/async-thread.c +++ b/fs/btrfs/async-thread.c @@ -48,6 +48,9 @@ struct btrfs_worker_thread {  	/* number of things on the pending list */  	atomic_t num_pending; +	/* reference counter for this struct */ +	atomic_t refs; +  	unsigned long sequence;  	/* protects the pending list. */ @@ -71,7 +74,12 @@ static void check_idle_worker(struct btrfs_worker_thread *worker)  		unsigned long flags;  		spin_lock_irqsave(&worker->workers->lock, flags);  		worker->idle = 1; -		list_move(&worker->worker_list, &worker->workers->idle_list); + +		/* the list may be empty if the worker is just starting */ +		if (!list_empty(&worker->worker_list)) { +			list_move(&worker->worker_list, +				 &worker->workers->idle_list); +		}  		spin_unlock_irqrestore(&worker->workers->lock, flags);  	}  } @@ -87,23 +95,49 @@ static void check_busy_worker(struct btrfs_worker_thread *worker)  		unsigned long flags;  		spin_lock_irqsave(&worker->workers->lock, flags);  		worker->idle = 0; -		list_move_tail(&worker->worker_list, -			       &worker->workers->worker_list); + +		if (!list_empty(&worker->worker_list)) { +			list_move_tail(&worker->worker_list, +				      &worker->workers->worker_list); +		}  		spin_unlock_irqrestore(&worker->workers->lock, flags);  	}  } -static noinline int run_ordered_completions(struct btrfs_workers *workers, -					    struct btrfs_work *work) +static void check_pending_worker_creates(struct btrfs_worker_thread *worker)  { +	struct btrfs_workers *workers = worker->workers;  	unsigned long flags; +	rmb(); +	if (!workers->atomic_start_pending) +		return; + +	spin_lock_irqsave(&workers->lock, flags); +	if (!workers->atomic_start_pending) +		goto out; + +	workers->atomic_start_pending = 0; +	if (workers->num_workers >= workers->max_workers) +		goto out; + +	spin_unlock_irqrestore(&workers->lock, flags); +	btrfs_start_workers(workers, 1); +	return; + +out: +	spin_unlock_irqrestore(&workers->lock, flags); +} + +static noinline int run_ordered_completions(struct btrfs_workers *workers, +					    struct btrfs_work *work) +{  	if (!workers->ordered)  		return 0;  	set_bit(WORK_DONE_BIT, &work->flags); -	spin_lock_irqsave(&workers->lock, flags); +	spin_lock(&workers->order_lock);  	while (1) {  		if (!list_empty(&workers->prio_order_list)) { @@ -126,45 +160,118 @@ static noinline int run_ordered_completions(struct btrfs_workers *workers,  		if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))  			break; -		spin_unlock_irqrestore(&workers->lock, flags); +		spin_unlock(&workers->order_lock);  		work->ordered_func(work);  		/* now take the lock again and call the freeing code */ -		spin_lock_irqsave(&workers->lock, flags); +		spin_lock(&workers->order_lock);  		list_del(&work->order_list);  		work->ordered_free(work);  	} -	spin_unlock_irqrestore(&workers->lock, flags); +	spin_unlock(&workers->order_lock);  	return 0;  } +static void put_worker(struct btrfs_worker_thread *worker) +{ +	if (atomic_dec_and_test(&worker->refs)) +		kfree(worker); +} + +static int try_worker_shutdown(struct btrfs_worker_thread *worker) +{ +	int freeit = 0; + +	spin_lock_irq(&worker->lock); +	spin_lock(&worker->workers->lock); +	if (worker->workers->num_workers > 1 && +	    worker->idle && +	    !worker->working && +	    !list_empty(&worker->worker_list) && +	    list_empty(&worker->prio_pending) && +	    list_empty(&worker->pending) && +	    atomic_read(&worker->num_pending) == 0) { +		freeit = 1; +		list_del_init(&worker->worker_list); +		worker->workers->num_workers--; +	} +	spin_unlock(&worker->workers->lock); +	spin_unlock_irq(&worker->lock); + +	if (freeit) +		put_worker(worker); +	return freeit; +} + +static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker, +					struct list_head *prio_head, +					struct list_head *head) +{ +	struct btrfs_work *work = NULL; +	struct list_head *cur = NULL; + +	if(!list_empty(prio_head)) +		cur = prio_head->next; + +	smp_mb(); +	if (!list_empty(&worker->prio_pending)) +		goto refill; + +	if (!list_empty(head)) +		cur = head->next; + +	if (cur) +		goto out; + +refill: +	spin_lock_irq(&worker->lock); +	list_splice_tail_init(&worker->prio_pending, prio_head); +	list_splice_tail_init(&worker->pending, head); + +	if (!list_empty(prio_head)) +		cur = prio_head->next; +	else if (!list_empty(head)) +		cur = head->next; +	spin_unlock_irq(&worker->lock); + +	if (!cur) +		goto out_fail; + +out: +	work = list_entry(cur, struct btrfs_work, list); + +out_fail: +	return work; +} +  /*   * main loop for servicing work items   */  static int worker_loop(void *arg)  {  	struct btrfs_worker_thread *worker = arg; -	struct list_head *cur; +	struct list_head head; +	struct list_head prio_head;  	struct btrfs_work *work; + +	INIT_LIST_HEAD(&head); +	INIT_LIST_HEAD(&prio_head); +  	do { -		spin_lock_irq(&worker->lock); -again_locked: +again:  		while (1) { -			if (!list_empty(&worker->prio_pending)) -				cur = worker->prio_pending.next; -			else if (!list_empty(&worker->pending)) -				cur = worker->pending.next; -			else + + +			work = get_next_work(worker, &prio_head, &head); +			if (!work)  				break; -			work = list_entry(cur, struct btrfs_work, list);  			list_del(&work->list);  			clear_bit(WORK_QUEUED_BIT, &work->flags);  			work->worker = worker; -			spin_unlock_irq(&worker->lock);  			work->func(work); @@ -175,9 +282,13 @@ again_locked:  			 */  			run_ordered_completions(worker->workers, work); -			spin_lock_irq(&worker->lock); -			check_idle_worker(worker); +			check_pending_worker_creates(worker); +  		} + +		spin_lock_irq(&worker->lock); +		check_idle_worker(worker); +  		if (freezing(current)) {  			worker->working = 0;  			spin_unlock_irq(&worker->lock); @@ -216,8 +327,10 @@ again_locked:  				spin_lock_irq(&worker->lock);  				set_current_state(TASK_INTERRUPTIBLE);  				if (!list_empty(&worker->pending) || -				    !list_empty(&worker->prio_pending)) -					goto again_locked; +				    !list_empty(&worker->prio_pending)) { +					spin_unlock_irq(&worker->lock); +					goto again; +				}  				/*  				 * this makes sure we get a wakeup when someone @@ -226,8 +339,13 @@ again_locked:  				worker->working = 0;  				spin_unlock_irq(&worker->lock); -				if (!kthread_should_stop()) -					schedule(); +				if (!kthread_should_stop()) { +					schedule_timeout(HZ * 120); +					if (!worker->working && +					    try_worker_shutdown(worker)) { +						return 0; +					} +				}  			}  			__set_current_state(TASK_RUNNING);  		} @@ -242,16 +360,30 @@ int btrfs_stop_workers(struct btrfs_workers *workers)  {  	struct list_head *cur;  	struct btrfs_worker_thread *worker; +	int can_stop; +	spin_lock_irq(&workers->lock);  	list_splice_init(&workers->idle_list, &workers->worker_list);  	while (!list_empty(&workers->worker_list)) {  		cur = workers->worker_list.next;  		worker = list_entry(cur, struct btrfs_worker_thread,  				    worker_list); -		kthread_stop(worker->task); -		list_del(&worker->worker_list); -		kfree(worker); + +		atomic_inc(&worker->refs); +		workers->num_workers -= 1; +		if (!list_empty(&worker->worker_list)) { +			list_del_init(&worker->worker_list); +			put_worker(worker); +			can_stop = 1; +		} else +			can_stop = 0; +		spin_unlock_irq(&workers->lock); +		if (can_stop) +			kthread_stop(worker->task); +		spin_lock_irq(&workers->lock); +		put_worker(worker);  	} +	spin_unlock_irq(&workers->lock);  	return 0;  } @@ -266,10 +398,13 @@ void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max)  	INIT_LIST_HEAD(&workers->order_list);  	INIT_LIST_HEAD(&workers->prio_order_list);  	spin_lock_init(&workers->lock); +	spin_lock_init(&workers->order_lock);  	workers->max_workers = max;  	workers->idle_thresh = 32;  	workers->name = name;  	workers->ordered = 0; +	workers->atomic_start_pending = 0; +	workers->atomic_worker_start = 0;  }  /* @@ -293,7 +428,9 @@ int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)  		INIT_LIST_HEAD(&worker->prio_pending);  		INIT_LIST_HEAD(&worker->worker_list);  		spin_lock_init(&worker->lock); +  		atomic_set(&worker->num_pending, 0); +		atomic_set(&worker->refs, 1);  		worker->workers = workers;  		worker->task = kthread_run(worker_loop, worker,  					   "btrfs-%s-%d", workers->name, @@ -303,7 +440,6 @@ int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)  			kfree(worker);  			goto fail;  		} -  		spin_lock_irq(&workers->lock);  		list_add_tail(&worker->worker_list, &workers->idle_list);  		worker->idle = 1; @@ -350,7 +486,6 @@ static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)  	 */  	next = workers->worker_list.next;  	worker = list_entry(next, struct btrfs_worker_thread, worker_list); -	atomic_inc(&worker->num_pending);  	worker->sequence++;  	if (worker->sequence % workers->idle_thresh == 0) @@ -367,28 +502,18 @@ static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)  {  	struct btrfs_worker_thread *worker;  	unsigned long flags; +	struct list_head *fallback;  again:  	spin_lock_irqsave(&workers->lock, flags);  	worker = next_worker(workers); -	spin_unlock_irqrestore(&workers->lock, flags);  	if (!worker) { -		spin_lock_irqsave(&workers->lock, flags);  		if (workers->num_workers >= workers->max_workers) { -			struct list_head *fallback = NULL; -			/* -			 * we have failed to find any workers, just -			 * return the force one -			 */ -			if (!list_empty(&workers->worker_list)) -				fallback = workers->worker_list.next; -			if (!list_empty(&workers->idle_list)) -				fallback = workers->idle_list.next; -			BUG_ON(!fallback); -			worker = list_entry(fallback, -				  struct btrfs_worker_thread, worker_list); -			spin_unlock_irqrestore(&workers->lock, flags); +			goto fallback; +		} else if (workers->atomic_worker_start) { +			workers->atomic_start_pending = 1; +			goto fallback;  		} else {  			spin_unlock_irqrestore(&workers->lock, flags);  			/* we're below the limit, start another worker */ @@ -396,6 +521,28 @@ again:  			goto again;  		}  	} +	goto found; + +fallback: +	fallback = NULL; +	/* +	 * we have failed to find any workers, just +	 * return the first one we can find. +	 */ +	if (!list_empty(&workers->worker_list)) +		fallback = workers->worker_list.next; +	if (!list_empty(&workers->idle_list)) +		fallback = workers->idle_list.next; +	BUG_ON(!fallback); +	worker = list_entry(fallback, +		  struct btrfs_worker_thread, worker_list); +found: +	/* +	 * this makes sure the worker doesn't exit before it is placed +	 * onto a busy/idle list +	 */ +	atomic_inc(&worker->num_pending); +	spin_unlock_irqrestore(&workers->lock, flags);  	return worker;  } @@ -427,7 +574,7 @@ int btrfs_requeue_work(struct btrfs_work *work)  		spin_lock(&worker->workers->lock);  		worker->idle = 0;  		list_move_tail(&worker->worker_list, -			       &worker->workers->worker_list); +			      &worker->workers->worker_list);  		spin_unlock(&worker->workers->lock);  	}  	if (!worker->working) { @@ -435,9 +582,9 @@ int btrfs_requeue_work(struct btrfs_work *work)  		worker->working = 1;  	} -	spin_unlock_irqrestore(&worker->lock, flags);  	if (wake)  		wake_up_process(worker->task); +	spin_unlock_irqrestore(&worker->lock, flags);  out:  	return 0; @@ -463,14 +610,18 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)  	worker = find_worker(workers);  	if (workers->ordered) { -		spin_lock_irqsave(&workers->lock, flags); +		/* +		 * you're not allowed to do ordered queues from an +		 * interrupt handler +		 */ +		spin_lock(&workers->order_lock);  		if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {  			list_add_tail(&work->order_list,  				      &workers->prio_order_list);  		} else {  			list_add_tail(&work->order_list, &workers->order_list);  		} -		spin_unlock_irqrestore(&workers->lock, flags); +		spin_unlock(&workers->order_lock);  	} else {  		INIT_LIST_HEAD(&work->order_list);  	} @@ -481,7 +632,6 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)  		list_add_tail(&work->list, &worker->prio_pending);  	else  		list_add_tail(&work->list, &worker->pending); -	atomic_inc(&worker->num_pending);  	check_busy_worker(worker);  	/* @@ -492,10 +642,10 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)  		wake = 1;  	worker->working = 1; -	spin_unlock_irqrestore(&worker->lock, flags); -  	if (wake)  		wake_up_process(worker->task); +	spin_unlock_irqrestore(&worker->lock, flags); +  out:  	return 0;  }  | 
