diff options
Diffstat (limited to 'kernel/trace/ring_buffer.c')
| -rw-r--r-- | kernel/trace/ring_buffer.c | 1661 | 
1 files changed, 1295 insertions, 366 deletions
diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c index 9ed509a015d..ff7027199a9 100644 --- a/kernel/trace/ring_buffer.c +++ b/kernel/trace/ring_buffer.c @@ -3,17 +3,21 @@   *   * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>   */ +#include <linux/ftrace_event.h>  #include <linux/ring_buffer.h>  #include <linux/trace_clock.h> -#include <linux/ftrace_irq.h> +#include <linux/trace_seq.h>  #include <linux/spinlock.h> +#include <linux/irq_work.h>  #include <linux/debugfs.h>  #include <linux/uaccess.h>  #include <linux/hardirq.h> +#include <linux/kthread.h>	/* for self test */  #include <linux/kmemcheck.h>  #include <linux/module.h>  #include <linux/percpu.h>  #include <linux/mutex.h> +#include <linux/delay.h>  #include <linux/slab.h>  #include <linux/init.h>  #include <linux/hash.h> @@ -22,7 +26,8 @@  #include <linux/fs.h>  #include <asm/local.h> -#include "trace.h" + +static void update_pages_handler(struct work_struct *work);  /*   * The ring buffer header is special. We must manually up keep it. @@ -31,11 +36,11 @@ int ring_buffer_print_entry_header(struct trace_seq *s)  {  	int ret; -	ret = trace_seq_printf(s, "# compressed entry header\n"); -	ret = trace_seq_printf(s, "\ttype_len    :    5 bits\n"); -	ret = trace_seq_printf(s, "\ttime_delta  :   27 bits\n"); -	ret = trace_seq_printf(s, "\tarray       :   32 bits\n"); -	ret = trace_seq_printf(s, "\n"); +	ret = trace_seq_puts(s, "# compressed entry header\n"); +	ret = trace_seq_puts(s, "\ttype_len    :    5 bits\n"); +	ret = trace_seq_puts(s, "\ttime_delta  :   27 bits\n"); +	ret = trace_seq_puts(s, "\tarray       :   32 bits\n"); +	ret = trace_seq_putc(s, '\n');  	ret = trace_seq_printf(s, "\tpadding     : type == %d\n",  			       RINGBUF_TYPE_PADDING);  	ret = trace_seq_printf(s, "\ttime_extend : type == %d\n", @@ -155,33 +160,10 @@ enum {  static unsigned long ring_buffer_flags __read_mostly = RB_BUFFERS_ON; -#define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) +/* Used for individual buffers (after the counter) */ +#define RB_BUFFER_OFF		(1 << 20) -/** - * tracing_on - enable all tracing buffers - * - * This function enables all tracing buffers that may have been - * disabled with tracing_off. - */ -void tracing_on(void) -{ -	set_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags); -} -EXPORT_SYMBOL_GPL(tracing_on); - -/** - * tracing_off - turn off all tracing buffers - * - * This function stops all tracing buffers from recording data. - * It does not disable any overhead the tracers themselves may - * be causing. This function simply causes all recording to - * the ring buffers to fail. - */ -void tracing_off(void) -{ -	clear_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags); -} -EXPORT_SYMBOL_GPL(tracing_off); +#define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data)  /**   * tracing_off_permanent - permanently disable ring buffers @@ -194,21 +176,12 @@ void tracing_off_permanent(void)  	set_bit(RB_BUFFERS_DISABLED_BIT, &ring_buffer_flags);  } -/** - * tracing_is_on - show state of ring buffers enabled - */ -int tracing_is_on(void) -{ -	return ring_buffer_flags == RB_BUFFERS_ON; -} -EXPORT_SYMBOL_GPL(tracing_is_on); -  #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))  #define RB_ALIGNMENT		4U  #define RB_MAX_SMALL_DATA	(RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)  #define RB_EVNT_MIN_SIZE	8U	/* two 32bit words */ -#if !defined(CONFIG_64BIT) || defined(CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS) +#ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS  # define RB_FORCE_8BYTE_ALIGNMENT	0  # define RB_ARCH_ALIGNMENT		RB_ALIGNMENT  #else @@ -216,6 +189,8 @@ EXPORT_SYMBOL_GPL(tracing_is_on);  # define RB_ARCH_ALIGNMENT		8U  #endif +#define RB_ALIGN_DATA		__aligned(RB_ARCH_ALIGNMENT) +  /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */  #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX @@ -364,7 +339,7 @@ EXPORT_SYMBOL_GPL(ring_buffer_event_data);  struct buffer_data_page {  	u64		 time_stamp;	/* page time stamp */  	local_t		 commit;	/* write committed index */ -	unsigned char	 data[];	/* data of buffer page */ +	unsigned char	 data[] RB_ALIGN_DATA;	/* data of buffer page */  };  /* @@ -472,6 +447,12 @@ int ring_buffer_print_page_header(struct trace_seq *s)  	return ret;  } +struct rb_irq_work { +	struct irq_work			work; +	wait_queue_head_t		waiters; +	bool				waiters_pending; +}; +  /*   * head_page == tail_page && head == tail then buffer is empty.   */ @@ -479,9 +460,10 @@ struct ring_buffer_per_cpu {  	int				cpu;  	atomic_t			record_disabled;  	struct ring_buffer		*buffer; -	spinlock_t			reader_lock;	/* serialize readers */ +	raw_spinlock_t			reader_lock;	/* serialize readers */  	arch_spinlock_t			lock;  	struct lock_class_key		lock_key; +	unsigned int			nr_pages;  	struct list_head		*pages;  	struct buffer_page		*head_page;	/* read from head */  	struct buffer_page		*tail_page;	/* write to tail */ @@ -489,21 +471,31 @@ struct ring_buffer_per_cpu {  	struct buffer_page		*reader_page;  	unsigned long			lost_events;  	unsigned long			last_overrun; -	local_t				commit_overrun; -	local_t				overrun; +	local_t				entries_bytes;  	local_t				entries; +	local_t				overrun; +	local_t				commit_overrun; +	local_t				dropped_events;  	local_t				committing;  	local_t				commits;  	unsigned long			read; +	unsigned long			read_bytes;  	u64				write_stamp;  	u64				read_stamp; +	/* ring buffer pages to update, > 0 to add, < 0 to remove */ +	int				nr_pages_to_update; +	struct list_head		new_pages; /* new pages to add */ +	struct work_struct		update_pages_work; +	struct completion		update_done; + +	struct rb_irq_work		irq_work;  };  struct ring_buffer { -	unsigned			pages;  	unsigned			flags;  	int				cpus;  	atomic_t			record_disabled; +	atomic_t			resize_disabled;  	cpumask_var_t			cpumask;  	struct lock_class_key		*reader_lock_key; @@ -516,6 +508,8 @@ struct ring_buffer {  	struct notifier_block		cpu_notify;  #endif  	u64				(*clock)(void); + +	struct rb_irq_work		irq_work;  };  struct ring_buffer_iter { @@ -527,6 +521,120 @@ struct ring_buffer_iter {  	u64				read_stamp;  }; +/* + * rb_wake_up_waiters - wake up tasks waiting for ring buffer input + * + * Schedules a delayed work to wake up any task that is blocked on the + * ring buffer waiters queue. + */ +static void rb_wake_up_waiters(struct irq_work *work) +{ +	struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); + +	wake_up_all(&rbwork->waiters); +} + +/** + * ring_buffer_wait - wait for input to the ring buffer + * @buffer: buffer to wait on + * @cpu: the cpu buffer to wait on + * + * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon + * as data is added to any of the @buffer's cpu buffers. Otherwise + * it will wait for data to be added to a specific cpu buffer. + */ +int ring_buffer_wait(struct ring_buffer *buffer, int cpu) +{ +	struct ring_buffer_per_cpu *cpu_buffer; +	DEFINE_WAIT(wait); +	struct rb_irq_work *work; + +	/* +	 * Depending on what the caller is waiting for, either any +	 * data in any cpu buffer, or a specific buffer, put the +	 * caller on the appropriate wait queue. +	 */ +	if (cpu == RING_BUFFER_ALL_CPUS) +		work = &buffer->irq_work; +	else { +		if (!cpumask_test_cpu(cpu, buffer->cpumask)) +			return -ENODEV; +		cpu_buffer = buffer->buffers[cpu]; +		work = &cpu_buffer->irq_work; +	} + + +	prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE); + +	/* +	 * The events can happen in critical sections where +	 * checking a work queue can cause deadlocks. +	 * After adding a task to the queue, this flag is set +	 * only to notify events to try to wake up the queue +	 * using irq_work. +	 * +	 * We don't clear it even if the buffer is no longer +	 * empty. The flag only causes the next event to run +	 * irq_work to do the work queue wake up. The worse +	 * that can happen if we race with !trace_empty() is that +	 * an event will cause an irq_work to try to wake up +	 * an empty queue. +	 * +	 * There's no reason to protect this flag either, as +	 * the work queue and irq_work logic will do the necessary +	 * synchronization for the wake ups. The only thing +	 * that is necessary is that the wake up happens after +	 * a task has been queued. It's OK for spurious wake ups. +	 */ +	work->waiters_pending = true; + +	if ((cpu == RING_BUFFER_ALL_CPUS && ring_buffer_empty(buffer)) || +	    (cpu != RING_BUFFER_ALL_CPUS && ring_buffer_empty_cpu(buffer, cpu))) +		schedule(); + +	finish_wait(&work->waiters, &wait); +	return 0; +} + +/** + * ring_buffer_poll_wait - poll on buffer input + * @buffer: buffer to wait on + * @cpu: the cpu buffer to wait on + * @filp: the file descriptor + * @poll_table: The poll descriptor + * + * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon + * as data is added to any of the @buffer's cpu buffers. Otherwise + * it will wait for data to be added to a specific cpu buffer. + * + * Returns POLLIN | POLLRDNORM if data exists in the buffers, + * zero otherwise. + */ +int ring_buffer_poll_wait(struct ring_buffer *buffer, int cpu, +			  struct file *filp, poll_table *poll_table) +{ +	struct ring_buffer_per_cpu *cpu_buffer; +	struct rb_irq_work *work; + +	if (cpu == RING_BUFFER_ALL_CPUS) +		work = &buffer->irq_work; +	else { +		if (!cpumask_test_cpu(cpu, buffer->cpumask)) +			return -EINVAL; + +		cpu_buffer = buffer->buffers[cpu]; +		work = &cpu_buffer->irq_work; +	} + +	work->waiters_pending = true; +	poll_wait(filp, &work->waiters, poll_table); + +	if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || +	    (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) +		return POLLIN | POLLRDNORM; +	return 0; +} +  /* buffer may be either ring_buffer or ring_buffer_per_cpu */  #define RB_WARN_ON(b, cond)						\  	({								\ @@ -669,7 +777,7 @@ static struct list_head *rb_list_head(struct list_head *list)   * the reader page). But if the next page is a header page,   * its flags will be non zero.   */ -static int inline +static inline int  rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer,  		struct buffer_page *page, struct list_head *list)  { @@ -957,7 +1065,7 @@ static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer,  }  /** - * check_pages - integrity check of buffer pages + * rb_check_pages - integrity check of buffer pages   * @cpu_buffer: CPU buffer with pages to test   *   * As a safety measure we check to make sure the data pages have not @@ -968,6 +1076,10 @@ static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)  	struct list_head *head = cpu_buffer->pages;  	struct buffer_page *bpage, *tmp; +	/* Reset the head page if it exists */ +	if (cpu_buffer->head_page) +		rb_set_head_page(cpu_buffer); +  	rb_head_page_deactivate(cpu_buffer);  	if (RB_WARN_ON(cpu_buffer, head->next->prev != head)) @@ -994,33 +1106,55 @@ static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)  	return 0;  } -static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, -			     unsigned nr_pages) +static int __rb_allocate_pages(int nr_pages, struct list_head *pages, int cpu)  { +	int i;  	struct buffer_page *bpage, *tmp; -	unsigned long addr; -	LIST_HEAD(pages); -	unsigned i; - -	WARN_ON(!nr_pages);  	for (i = 0; i < nr_pages; i++) { +		struct page *page; +		/* +		 * __GFP_NORETRY flag makes sure that the allocation fails +		 * gracefully without invoking oom-killer and the system is +		 * not destabilized. +		 */  		bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), -				    GFP_KERNEL, cpu_to_node(cpu_buffer->cpu)); +				    GFP_KERNEL | __GFP_NORETRY, +				    cpu_to_node(cpu));  		if (!bpage)  			goto free_pages; -		rb_check_bpage(cpu_buffer, bpage); - -		list_add(&bpage->list, &pages); +		list_add(&bpage->list, pages); -		addr = __get_free_page(GFP_KERNEL); -		if (!addr) +		page = alloc_pages_node(cpu_to_node(cpu), +					GFP_KERNEL | __GFP_NORETRY, 0); +		if (!page)  			goto free_pages; -		bpage->page = (void *)addr; +		bpage->page = page_address(page);  		rb_init_page(bpage->page);  	} +	return 0; + +free_pages: +	list_for_each_entry_safe(bpage, tmp, pages, list) { +		list_del_init(&bpage->list); +		free_buffer_page(bpage); +	} + +	return -ENOMEM; +} + +static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, +			     unsigned nr_pages) +{ +	LIST_HEAD(pages); + +	WARN_ON(!nr_pages); + +	if (__rb_allocate_pages(nr_pages, &pages, cpu_buffer->cpu)) +		return -ENOMEM; +  	/*  	 * The ring buffer page list is a circular list that does not  	 * start and end with a list head. All page list items point to @@ -1029,24 +1163,19 @@ static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,  	cpu_buffer->pages = pages.next;  	list_del(&pages); +	cpu_buffer->nr_pages = nr_pages; +  	rb_check_pages(cpu_buffer);  	return 0; - - free_pages: -	list_for_each_entry_safe(bpage, tmp, &pages, list) { -		list_del_init(&bpage->list); -		free_buffer_page(bpage); -	} -	return -ENOMEM;  }  static struct ring_buffer_per_cpu * -rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu) +rb_allocate_cpu_buffer(struct ring_buffer *buffer, int nr_pages, int cpu)  {  	struct ring_buffer_per_cpu *cpu_buffer;  	struct buffer_page *bpage; -	unsigned long addr; +	struct page *page;  	int ret;  	cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), @@ -1056,9 +1185,13 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)  	cpu_buffer->cpu = cpu;  	cpu_buffer->buffer = buffer; -	spin_lock_init(&cpu_buffer->reader_lock); +	raw_spin_lock_init(&cpu_buffer->reader_lock);  	lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);  	cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; +	INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); +	init_completion(&cpu_buffer->update_done); +	init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); +	init_waitqueue_head(&cpu_buffer->irq_work.waiters);  	bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),  			    GFP_KERNEL, cpu_to_node(cpu)); @@ -1068,15 +1201,16 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)  	rb_check_bpage(cpu_buffer, bpage);  	cpu_buffer->reader_page = bpage; -	addr = __get_free_page(GFP_KERNEL); -	if (!addr) +	page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, 0); +	if (!page)  		goto fail_free_reader; -	bpage->page = (void *)addr; +	bpage->page = page_address(page);  	rb_init_page(bpage->page);  	INIT_LIST_HEAD(&cpu_buffer->reader_page->list); +	INIT_LIST_HEAD(&cpu_buffer->new_pages); -	ret = rb_allocate_pages(cpu_buffer, buffer->pages); +	ret = rb_allocate_pages(cpu_buffer, nr_pages);  	if (ret < 0)  		goto fail_free_reader; @@ -1123,7 +1257,7 @@ static int rb_cpu_notify(struct notifier_block *self,  #endif  /** - * ring_buffer_alloc - allocate a new ring_buffer + * __ring_buffer_alloc - allocate a new ring_buffer   * @size: the size in bytes per cpu that is needed.   * @flags: attributes to set for the ring buffer.   * @@ -1137,7 +1271,7 @@ struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,  {  	struct ring_buffer *buffer;  	int bsize; -	int cpu; +	int cpu, nr_pages;  	/* keep it in its own cache line */  	buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), @@ -1148,14 +1282,17 @@ struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,  	if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))  		goto fail_free_buffer; -	buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); +	nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);  	buffer->flags = flags;  	buffer->clock = trace_clock_local;  	buffer->reader_lock_key = key; +	init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); +	init_waitqueue_head(&buffer->irq_work.waiters); +  	/* need at least two pages */ -	if (buffer->pages < 2) -		buffer->pages = 2; +	if (nr_pages < 2) +		nr_pages = 2;  	/*  	 * In case of non-hotplug cpu, if the ring-buffer is allocated @@ -1163,7 +1300,7 @@ struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,  	 * In that off case, we need to allocate for all possible cpus.  	 */  #ifdef CONFIG_HOTPLUG_CPU -	get_online_cpus(); +	cpu_notifier_register_begin();  	cpumask_copy(buffer->cpumask, cpu_online_mask);  #else  	cpumask_copy(buffer->cpumask, cpu_possible_mask); @@ -1178,7 +1315,7 @@ struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,  	for_each_buffer_cpu(buffer, cpu) {  		buffer->buffers[cpu] = -			rb_allocate_cpu_buffer(buffer, cpu); +			rb_allocate_cpu_buffer(buffer, nr_pages, cpu);  		if (!buffer->buffers[cpu])  			goto fail_free_buffers;  	} @@ -1186,10 +1323,10 @@ struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,  #ifdef CONFIG_HOTPLUG_CPU  	buffer->cpu_notify.notifier_call = rb_cpu_notify;  	buffer->cpu_notify.priority = 0; -	register_cpu_notifier(&buffer->cpu_notify); +	__register_cpu_notifier(&buffer->cpu_notify); +	cpu_notifier_register_done();  #endif -	put_online_cpus();  	mutex_init(&buffer->mutex);  	return buffer; @@ -1203,7 +1340,9 @@ struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,   fail_free_cpumask:  	free_cpumask_var(buffer->cpumask); -	put_online_cpus(); +#ifdef CONFIG_HOTPLUG_CPU +	cpu_notifier_register_done(); +#endif   fail_free_buffer:  	kfree(buffer); @@ -1220,16 +1359,17 @@ ring_buffer_free(struct ring_buffer *buffer)  {  	int cpu; -	get_online_cpus(); -  #ifdef CONFIG_HOTPLUG_CPU -	unregister_cpu_notifier(&buffer->cpu_notify); +	cpu_notifier_register_begin(); +	__unregister_cpu_notifier(&buffer->cpu_notify);  #endif  	for_each_buffer_cpu(buffer, cpu)  		rb_free_cpu_buffer(buffer->buffers[cpu]); -	put_online_cpus(); +#ifdef CONFIG_HOTPLUG_CPU +	cpu_notifier_register_done(); +#endif  	kfree(buffer->buffers);  	free_cpumask_var(buffer->cpumask); @@ -1246,78 +1386,241 @@ void ring_buffer_set_clock(struct ring_buffer *buffer,  static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); -static void -rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages) +static inline unsigned long rb_page_entries(struct buffer_page *bpage)  { -	struct buffer_page *bpage; -	struct list_head *p; -	unsigned i; +	return local_read(&bpage->entries) & RB_WRITE_MASK; +} -	spin_lock_irq(&cpu_buffer->reader_lock); -	rb_head_page_deactivate(cpu_buffer); +static inline unsigned long rb_page_write(struct buffer_page *bpage) +{ +	return local_read(&bpage->write) & RB_WRITE_MASK; +} -	for (i = 0; i < nr_pages; i++) { -		if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages))) -			goto out; -		p = cpu_buffer->pages->next; -		bpage = list_entry(p, struct buffer_page, list); -		list_del_init(&bpage->list); -		free_buffer_page(bpage); +static int +rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned int nr_pages) +{ +	struct list_head *tail_page, *to_remove, *next_page; +	struct buffer_page *to_remove_page, *tmp_iter_page; +	struct buffer_page *last_page, *first_page; +	unsigned int nr_removed; +	unsigned long head_bit; +	int page_entries; + +	head_bit = 0; + +	raw_spin_lock_irq(&cpu_buffer->reader_lock); +	atomic_inc(&cpu_buffer->record_disabled); +	/* +	 * We don't race with the readers since we have acquired the reader +	 * lock. We also don't race with writers after disabling recording. +	 * This makes it easy to figure out the first and the last page to be +	 * removed from the list. We unlink all the pages in between including +	 * the first and last pages. This is done in a busy loop so that we +	 * lose the least number of traces. +	 * The pages are freed after we restart recording and unlock readers. +	 */ +	tail_page = &cpu_buffer->tail_page->list; + +	/* +	 * tail page might be on reader page, we remove the next page +	 * from the ring buffer +	 */ +	if (cpu_buffer->tail_page == cpu_buffer->reader_page) +		tail_page = rb_list_head(tail_page->next); +	to_remove = tail_page; + +	/* start of pages to remove */ +	first_page = list_entry(rb_list_head(to_remove->next), +				struct buffer_page, list); + +	for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { +		to_remove = rb_list_head(to_remove)->next; +		head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD;  	} -	if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages))) -		goto out; -	rb_reset_cpu(cpu_buffer); -	rb_check_pages(cpu_buffer); +	next_page = rb_list_head(to_remove)->next; -out: -	spin_unlock_irq(&cpu_buffer->reader_lock); +	/* +	 * Now we remove all pages between tail_page and next_page. +	 * Make sure that we have head_bit value preserved for the +	 * next page +	 */ +	tail_page->next = (struct list_head *)((unsigned long)next_page | +						head_bit); +	next_page = rb_list_head(next_page); +	next_page->prev = tail_page; + +	/* make sure pages points to a valid page in the ring buffer */ +	cpu_buffer->pages = next_page; + +	/* update head page */ +	if (head_bit) +		cpu_buffer->head_page = list_entry(next_page, +						struct buffer_page, list); + +	/* +	 * change read pointer to make sure any read iterators reset +	 * themselves +	 */ +	cpu_buffer->read = 0; + +	/* pages are removed, resume tracing and then free the pages */ +	atomic_dec(&cpu_buffer->record_disabled); +	raw_spin_unlock_irq(&cpu_buffer->reader_lock); + +	RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); + +	/* last buffer page to remove */ +	last_page = list_entry(rb_list_head(to_remove), struct buffer_page, +				list); +	tmp_iter_page = first_page; + +	do { +		to_remove_page = tmp_iter_page; +		rb_inc_page(cpu_buffer, &tmp_iter_page); + +		/* update the counters */ +		page_entries = rb_page_entries(to_remove_page); +		if (page_entries) { +			/* +			 * If something was added to this page, it was full +			 * since it is not the tail page. So we deduct the +			 * bytes consumed in ring buffer from here. +			 * Increment overrun to account for the lost events. +			 */ +			local_add(page_entries, &cpu_buffer->overrun); +			local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes); +		} + +		/* +		 * We have already removed references to this list item, just +		 * free up the buffer_page and its page +		 */ +		free_buffer_page(to_remove_page); +		nr_removed--; + +	} while (to_remove_page != last_page); + +	RB_WARN_ON(cpu_buffer, nr_removed); + +	return nr_removed == 0;  } -static void -rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer, -		struct list_head *pages, unsigned nr_pages) +static int +rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer)  { -	struct buffer_page *bpage; -	struct list_head *p; -	unsigned i; +	struct list_head *pages = &cpu_buffer->new_pages; +	int retries, success; -	spin_lock_irq(&cpu_buffer->reader_lock); -	rb_head_page_deactivate(cpu_buffer); +	raw_spin_lock_irq(&cpu_buffer->reader_lock); +	/* +	 * We are holding the reader lock, so the reader page won't be swapped +	 * in the ring buffer. Now we are racing with the writer trying to +	 * move head page and the tail page. +	 * We are going to adapt the reader page update process where: +	 * 1. We first splice the start and end of list of new pages between +	 *    the head page and its previous page. +	 * 2. We cmpxchg the prev_page->next to point from head page to the +	 *    start of new pages list. +	 * 3. Finally, we update the head->prev to the end of new list. +	 * +	 * We will try this process 10 times, to make sure that we don't keep +	 * spinning. +	 */ +	retries = 10; +	success = 0; +	while (retries--) { +		struct list_head *head_page, *prev_page, *r; +		struct list_head *last_page, *first_page; +		struct list_head *head_page_with_bit; -	for (i = 0; i < nr_pages; i++) { -		if (RB_WARN_ON(cpu_buffer, list_empty(pages))) -			goto out; -		p = pages->next; -		bpage = list_entry(p, struct buffer_page, list); -		list_del_init(&bpage->list); -		list_add_tail(&bpage->list, cpu_buffer->pages); +		head_page = &rb_set_head_page(cpu_buffer)->list; +		if (!head_page) +			break; +		prev_page = head_page->prev; + +		first_page = pages->next; +		last_page  = pages->prev; + +		head_page_with_bit = (struct list_head *) +				     ((unsigned long)head_page | RB_PAGE_HEAD); + +		last_page->next = head_page_with_bit; +		first_page->prev = prev_page; + +		r = cmpxchg(&prev_page->next, head_page_with_bit, first_page); + +		if (r == head_page_with_bit) { +			/* +			 * yay, we replaced the page pointer to our new list, +			 * now, we just have to update to head page's prev +			 * pointer to point to end of list +			 */ +			head_page->prev = last_page; +			success = 1; +			break; +		}  	} -	rb_reset_cpu(cpu_buffer); -	rb_check_pages(cpu_buffer); -out: -	spin_unlock_irq(&cpu_buffer->reader_lock); +	if (success) +		INIT_LIST_HEAD(pages); +	/* +	 * If we weren't successful in adding in new pages, warn and stop +	 * tracing +	 */ +	RB_WARN_ON(cpu_buffer, !success); +	raw_spin_unlock_irq(&cpu_buffer->reader_lock); + +	/* free pages if they weren't inserted */ +	if (!success) { +		struct buffer_page *bpage, *tmp; +		list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, +					 list) { +			list_del_init(&bpage->list); +			free_buffer_page(bpage); +		} +	} +	return success; +} + +static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) +{ +	int success; + +	if (cpu_buffer->nr_pages_to_update > 0) +		success = rb_insert_pages(cpu_buffer); +	else +		success = rb_remove_pages(cpu_buffer, +					-cpu_buffer->nr_pages_to_update); + +	if (success) +		cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; +} + +static void update_pages_handler(struct work_struct *work) +{ +	struct ring_buffer_per_cpu *cpu_buffer = container_of(work, +			struct ring_buffer_per_cpu, update_pages_work); +	rb_update_pages(cpu_buffer); +	complete(&cpu_buffer->update_done);  }  /**   * ring_buffer_resize - resize the ring buffer   * @buffer: the buffer to resize.   * @size: the new size. + * @cpu_id: the cpu buffer to resize   *   * Minimum size is 2 * BUF_PAGE_SIZE.   * - * Returns -1 on failure. + * Returns 0 on success and < 0 on failure.   */ -int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size) +int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size, +			int cpu_id)  {  	struct ring_buffer_per_cpu *cpu_buffer; -	unsigned nr_pages, rm_pages, new_pages; -	struct buffer_page *bpage, *tmp; -	unsigned long buffer_size; -	unsigned long addr; -	LIST_HEAD(pages); -	int i, cpu; +	unsigned nr_pages; +	int cpu, err = 0;  	/*  	 * Always succeed at resizing a non-existent buffer: @@ -1325,109 +1628,199 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)  	if (!buffer)  		return size; +	/* Make sure the requested buffer exists */ +	if (cpu_id != RING_BUFFER_ALL_CPUS && +	    !cpumask_test_cpu(cpu_id, buffer->cpumask)) +		return size; +  	size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);  	size *= BUF_PAGE_SIZE; -	buffer_size = buffer->pages * BUF_PAGE_SIZE;  	/* we need a minimum of two pages */  	if (size < BUF_PAGE_SIZE * 2)  		size = BUF_PAGE_SIZE * 2; -	if (size == buffer_size) -		return size; - -	atomic_inc(&buffer->record_disabled); +	nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); -	/* Make sure all writers are done with this buffer. */ -	synchronize_sched(); +	/* +	 * Don't succeed if resizing is disabled, as a reader might be +	 * manipulating the ring buffer and is expecting a sane state while +	 * this is true. +	 */ +	if (atomic_read(&buffer->resize_disabled)) +		return -EBUSY; +	/* prevent another thread from changing buffer sizes */  	mutex_lock(&buffer->mutex); -	get_online_cpus(); - -	nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); -	if (size < buffer_size) { +	if (cpu_id == RING_BUFFER_ALL_CPUS) { +		/* calculate the pages to update */ +		for_each_buffer_cpu(buffer, cpu) { +			cpu_buffer = buffer->buffers[cpu]; -		/* easy case, just free pages */ -		if (RB_WARN_ON(buffer, nr_pages >= buffer->pages)) -			goto out_fail; +			cpu_buffer->nr_pages_to_update = nr_pages - +							cpu_buffer->nr_pages; +			/* +			 * nothing more to do for removing pages or no update +			 */ +			if (cpu_buffer->nr_pages_to_update <= 0) +				continue; +			/* +			 * to add pages, make sure all new pages can be +			 * allocated without receiving ENOMEM +			 */ +			INIT_LIST_HEAD(&cpu_buffer->new_pages); +			if (__rb_allocate_pages(cpu_buffer->nr_pages_to_update, +						&cpu_buffer->new_pages, cpu)) { +				/* not enough memory for new pages */ +				err = -ENOMEM; +				goto out_err; +			} +		} -		rm_pages = buffer->pages - nr_pages; +		get_online_cpus(); +		/* +		 * Fire off all the required work handlers +		 * We can't schedule on offline CPUs, but it's not necessary +		 * since we can change their buffer sizes without any race. +		 */ +		for_each_buffer_cpu(buffer, cpu) { +			cpu_buffer = buffer->buffers[cpu]; +			if (!cpu_buffer->nr_pages_to_update) +				continue; + +			/* The update must run on the CPU that is being updated. */ +			preempt_disable(); +			if (cpu == smp_processor_id() || !cpu_online(cpu)) { +				rb_update_pages(cpu_buffer); +				cpu_buffer->nr_pages_to_update = 0; +			} else { +				/* +				 * Can not disable preemption for schedule_work_on() +				 * on PREEMPT_RT. +				 */ +				preempt_enable(); +				schedule_work_on(cpu, +						&cpu_buffer->update_pages_work); +				preempt_disable(); +			} +			preempt_enable(); +		} +		/* wait for all the updates to complete */  		for_each_buffer_cpu(buffer, cpu) {  			cpu_buffer = buffer->buffers[cpu]; -			rb_remove_pages(cpu_buffer, rm_pages); +			if (!cpu_buffer->nr_pages_to_update) +				continue; + +			if (cpu_online(cpu)) +				wait_for_completion(&cpu_buffer->update_done); +			cpu_buffer->nr_pages_to_update = 0;  		} -		goto out; -	} -	/* -	 * This is a bit more difficult. We only want to add pages -	 * when we can allocate enough for all CPUs. We do this -	 * by allocating all the pages and storing them on a local -	 * link list. If we succeed in our allocation, then we -	 * add these pages to the cpu_buffers. Otherwise we just free -	 * them all and return -ENOMEM; -	 */ -	if (RB_WARN_ON(buffer, nr_pages <= buffer->pages)) -		goto out_fail; +		put_online_cpus(); +	} else { +		/* Make sure this CPU has been intitialized */ +		if (!cpumask_test_cpu(cpu_id, buffer->cpumask)) +			goto out; -	new_pages = nr_pages - buffer->pages; +		cpu_buffer = buffer->buffers[cpu_id]; -	for_each_buffer_cpu(buffer, cpu) { -		for (i = 0; i < new_pages; i++) { -			bpage = kzalloc_node(ALIGN(sizeof(*bpage), -						  cache_line_size()), -					    GFP_KERNEL, cpu_to_node(cpu)); -			if (!bpage) -				goto free_pages; -			list_add(&bpage->list, &pages); -			addr = __get_free_page(GFP_KERNEL); -			if (!addr) -				goto free_pages; -			bpage->page = (void *)addr; -			rb_init_page(bpage->page); +		if (nr_pages == cpu_buffer->nr_pages) +			goto out; + +		cpu_buffer->nr_pages_to_update = nr_pages - +						cpu_buffer->nr_pages; + +		INIT_LIST_HEAD(&cpu_buffer->new_pages); +		if (cpu_buffer->nr_pages_to_update > 0 && +			__rb_allocate_pages(cpu_buffer->nr_pages_to_update, +					    &cpu_buffer->new_pages, cpu_id)) { +			err = -ENOMEM; +			goto out_err;  		} -	} -	for_each_buffer_cpu(buffer, cpu) { -		cpu_buffer = buffer->buffers[cpu]; -		rb_insert_pages(cpu_buffer, &pages, new_pages); -	} +		get_online_cpus(); -	if (RB_WARN_ON(buffer, !list_empty(&pages))) -		goto out_fail; +		preempt_disable(); +		/* The update must run on the CPU that is being updated. */ +		if (cpu_id == smp_processor_id() || !cpu_online(cpu_id)) +			rb_update_pages(cpu_buffer); +		else { +			/* +			 * Can not disable preemption for schedule_work_on() +			 * on PREEMPT_RT. +			 */ +			preempt_enable(); +			schedule_work_on(cpu_id, +					 &cpu_buffer->update_pages_work); +			wait_for_completion(&cpu_buffer->update_done); +			preempt_disable(); +		} +		preempt_enable(); + +		cpu_buffer->nr_pages_to_update = 0; +		put_online_cpus(); +	}   out: -	buffer->pages = nr_pages; -	put_online_cpus(); +	/* +	 * The ring buffer resize can happen with the ring buffer +	 * enabled, so that the update disturbs the tracing as little +	 * as possible. But if the buffer is disabled, we do not need +	 * to worry about that, and we can take the time to verify +	 * that the buffer is not corrupt. +	 */ +	if (atomic_read(&buffer->record_disabled)) { +		atomic_inc(&buffer->record_disabled); +		/* +		 * Even though the buffer was disabled, we must make sure +		 * that it is truly disabled before calling rb_check_pages. +		 * There could have been a race between checking +		 * record_disable and incrementing it. +		 */ +		synchronize_sched(); +		for_each_buffer_cpu(buffer, cpu) { +			cpu_buffer = buffer->buffers[cpu]; +			rb_check_pages(cpu_buffer); +		} +		atomic_dec(&buffer->record_disabled); +	} +  	mutex_unlock(&buffer->mutex); +	return size; -	atomic_dec(&buffer->record_disabled); + out_err: +	for_each_buffer_cpu(buffer, cpu) { +		struct buffer_page *bpage, *tmp; -	return size; +		cpu_buffer = buffer->buffers[cpu]; +		cpu_buffer->nr_pages_to_update = 0; - free_pages: -	list_for_each_entry_safe(bpage, tmp, &pages, list) { -		list_del_init(&bpage->list); -		free_buffer_page(bpage); +		if (list_empty(&cpu_buffer->new_pages)) +			continue; + +		list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, +					list) { +			list_del_init(&bpage->list); +			free_buffer_page(bpage); +		}  	} -	put_online_cpus();  	mutex_unlock(&buffer->mutex); -	atomic_dec(&buffer->record_disabled); -	return -ENOMEM; +	return err; +} +EXPORT_SYMBOL_GPL(ring_buffer_resize); -	/* -	 * Something went totally wrong, and we are too paranoid -	 * to even clean up the mess. -	 */ - out_fail: -	put_online_cpus(); +void ring_buffer_change_overwrite(struct ring_buffer *buffer, int val) +{ +	mutex_lock(&buffer->mutex); +	if (val) +		buffer->flags |= RB_FL_OVERWRITE; +	else +		buffer->flags &= ~RB_FL_OVERWRITE;  	mutex_unlock(&buffer->mutex); -	atomic_dec(&buffer->record_disabled); -	return -1;  } -EXPORT_SYMBOL_GPL(ring_buffer_resize); +EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite);  static inline void *  __rb_data_page_index(struct buffer_data_page *bpage, unsigned index) @@ -1453,22 +1846,12 @@ rb_iter_head_event(struct ring_buffer_iter *iter)  	return __rb_page_index(iter->head_page, iter->head);  } -static inline unsigned long rb_page_write(struct buffer_page *bpage) -{ -	return local_read(&bpage->write) & RB_WRITE_MASK; -} -  static inline unsigned rb_page_commit(struct buffer_page *bpage)  {  	return local_read(&bpage->page->commit);  } -static inline unsigned long rb_page_entries(struct buffer_page *bpage) -{ -	return local_read(&bpage->entries) & RB_WRITE_MASK; -} - -/* Size is determined by what has been commited */ +/* Size is determined by what has been committed */  static inline unsigned rb_page_size(struct buffer_page *bpage)  {  	return rb_page_commit(bpage); @@ -1516,7 +1899,7 @@ rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)  	 * assign the commit to the tail.  	 */   again: -	max_count = cpu_buffer->buffer->pages * 100; +	max_count = cpu_buffer->nr_pages * 100;  	while (cpu_buffer->commit_page != cpu_buffer->tail_page) {  		if (RB_WARN_ON(cpu_buffer, !(--max_count))) @@ -1600,7 +1983,7 @@ rb_add_time_stamp(struct ring_buffer_event *event, u64 delta)  }  /** - * ring_buffer_update_event - update event type and data + * rb_update_event - update event type and data   * @event: the even to update   * @type: the type of event   * @length: the size of the event field in the ring buffer @@ -1684,6 +2067,7 @@ rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer,  		 * the counters.  		 */  		local_add(entries, &cpu_buffer->overrun); +		local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);  		/*  		 * The entries will be zeroed out when we move the @@ -1839,6 +2223,9 @@ rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,  	event = __rb_page_index(tail_page, tail);  	kmemcheck_annotate_bitfield(event, bitfield); +	/* account for padding bytes */ +	local_add(BUF_PAGE_SIZE - tail, &cpu_buffer->entries_bytes); +  	/*  	 * Save the original length to the meta data.  	 * This will be used by the reader to add lost event @@ -1931,8 +2318,10 @@ rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,  			 * If we are not in overwrite mode,  			 * this is easy, just stop here.  			 */ -			if (!(buffer->flags & RB_FL_OVERWRITE)) +			if (!(buffer->flags & RB_FL_OVERWRITE)) { +				local_inc(&cpu_buffer->dropped_events);  				goto out_reset; +			}  			ret = rb_handle_head_page(cpu_buffer,  						  tail_page, @@ -2010,6 +2399,13 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,  	write &= RB_WRITE_MASK;  	tail = write - length; +	/* +	 * If this is the first commit on the page, then it has the same +	 * timestamp as the page itself. +	 */ +	if (!tail) +		delta = 0; +  	/* See if we shot pass the end of this buffer page */  	if (unlikely(write > BUF_PAGE_SIZE))  		return rb_move_tail(cpu_buffer, length, tail, @@ -2030,6 +2426,9 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,  	if (!tail)  		tail_page->page->time_stamp = ts; +	/* account for these added bytes */ +	local_add(length, &cpu_buffer->entries_bytes); +  	return event;  } @@ -2052,6 +2451,7 @@ rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,  	if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {  		unsigned long write_mask =  			local_read(&bpage->write) & ~RB_WRITE_MASK; +		unsigned long event_length = rb_event_length(event);  		/*  		 * This is on the tail page. It is possible that  		 * a write could come in and move the tail page @@ -2061,8 +2461,11 @@ rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,  		old_index += write_mask;  		new_index += write_mask;  		index = local_cmpxchg(&bpage->write, old_index, new_index); -		if (index == old_index) +		if (index == old_index) { +			/* update counters */ +			local_sub(event_length, &cpu_buffer->entries_bytes);  			return 1; +		}  	}  	/* could not discard */ @@ -2162,11 +2565,19 @@ rb_reserve_next_event(struct ring_buffer *buffer,  	if (likely(ts >= cpu_buffer->write_stamp)) {  		delta = diff;  		if (unlikely(test_time_stamp(delta))) { +			int local_clock_stable = 1; +#ifdef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK +			local_clock_stable = sched_clock_stable(); +#endif  			WARN_ONCE(delta > (1ULL << 59), -				  KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n", +				  KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n%s",  				  (unsigned long long)delta,  				  (unsigned long long)ts, -				  (unsigned long long)cpu_buffer->write_stamp); +				  (unsigned long long)cpu_buffer->write_stamp, +				  local_clock_stable ? "" : +				  "If you just came from a suspend/resume,\n" +				  "please switch to the trace global clock:\n" +				  "  echo global > /sys/kernel/debug/tracing/trace_clock\n");  			add_timestamp = 1;  		}  	} @@ -2188,41 +2599,76 @@ rb_reserve_next_event(struct ring_buffer *buffer,  #ifdef CONFIG_TRACING -#define TRACE_RECURSIVE_DEPTH 16 +/* + * The lock and unlock are done within a preempt disable section. + * The current_context per_cpu variable can only be modified + * by the current task between lock and unlock. But it can + * be modified more than once via an interrupt. To pass this + * information from the lock to the unlock without having to + * access the 'in_interrupt()' functions again (which do show + * a bit of overhead in something as critical as function tracing, + * we use a bitmask trick. + * + *  bit 0 =  NMI context + *  bit 1 =  IRQ context + *  bit 2 =  SoftIRQ context + *  bit 3 =  normal context. + * + * This works because this is the order of contexts that can + * preempt other contexts. A SoftIRQ never preempts an IRQ + * context. + * + * When the context is determined, the corresponding bit is + * checked and set (if it was set, then a recursion of that context + * happened). + * + * On unlock, we need to clear this bit. To do so, just subtract + * 1 from the current_context and AND it to itself. + * + * (binary) + *  101 - 1 = 100 + *  101 & 100 = 100 (clearing bit zero) + * + *  1010 - 1 = 1001 + *  1010 & 1001 = 1000 (clearing bit 1) + * + * The least significant bit can be cleared this way, and it + * just so happens that it is the same bit corresponding to + * the current context. + */ +static DEFINE_PER_CPU(unsigned int, current_context); -/* Keep this code out of the fast path cache */ -static noinline void trace_recursive_fail(void) +static __always_inline int trace_recursive_lock(void)  { -	/* Disable all tracing before we do anything else */ -	tracing_off_permanent(); +	unsigned int val = this_cpu_read(current_context); +	int bit; -	printk_once(KERN_WARNING "Tracing recursion: depth[%ld]:" -		    "HC[%lu]:SC[%lu]:NMI[%lu]\n", -		    current->trace_recursion, -		    hardirq_count() >> HARDIRQ_SHIFT, -		    softirq_count() >> SOFTIRQ_SHIFT, -		    in_nmi()); - -	WARN_ON_ONCE(1); -} - -static inline int trace_recursive_lock(void) -{ -	current->trace_recursion++; +	if (in_interrupt()) { +		if (in_nmi()) +			bit = 0; +		else if (in_irq()) +			bit = 1; +		else +			bit = 2; +	} else +		bit = 3; -	if (likely(current->trace_recursion < TRACE_RECURSIVE_DEPTH)) -		return 0; +	if (unlikely(val & (1 << bit))) +		return 1; -	trace_recursive_fail(); +	val |= (1 << bit); +	this_cpu_write(current_context, val); -	return -1; +	return 0;  } -static inline void trace_recursive_unlock(void) +static __always_inline void trace_recursive_unlock(void)  { -	WARN_ON_ONCE(!current->trace_recursion); +	unsigned int val = this_cpu_read(current_context); -	current->trace_recursion--; +	val--; +	val &= this_cpu_read(current_context); +	this_cpu_write(current_context, val);  }  #else @@ -2330,6 +2776,22 @@ static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,  	rb_end_commit(cpu_buffer);  } +static __always_inline void +rb_wakeups(struct ring_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) +{ +	if (buffer->irq_work.waiters_pending) { +		buffer->irq_work.waiters_pending = false; +		/* irq_work_queue() supplies it's own memory barriers */ +		irq_work_queue(&buffer->irq_work.work); +	} + +	if (cpu_buffer->irq_work.waiters_pending) { +		cpu_buffer->irq_work.waiters_pending = false; +		/* irq_work_queue() supplies it's own memory barriers */ +		irq_work_queue(&cpu_buffer->irq_work.work); +	} +} +  /**   * ring_buffer_unlock_commit - commit a reserved   * @buffer: The buffer to commit to @@ -2349,6 +2811,8 @@ int ring_buffer_unlock_commit(struct ring_buffer *buffer,  	rb_commit(cpu_buffer, event); +	rb_wakeups(buffer, cpu_buffer); +  	trace_recursive_unlock();  	preempt_enable_notrace(); @@ -2481,8 +2945,8 @@ EXPORT_SYMBOL_GPL(ring_buffer_discard_commit);   * and not the length of the event which would hold the header.   */  int ring_buffer_write(struct ring_buffer *buffer, -			unsigned long length, -			void *data) +		      unsigned long length, +		      void *data)  {  	struct ring_buffer_per_cpu *cpu_buffer;  	struct ring_buffer_event *event; @@ -2521,6 +2985,8 @@ int ring_buffer_write(struct ring_buffer *buffer,  	rb_commit(cpu_buffer, event); +	rb_wakeups(buffer, cpu_buffer); +  	ret = 0;   out:  	preempt_enable_notrace(); @@ -2574,6 +3040,63 @@ void ring_buffer_record_enable(struct ring_buffer *buffer)  EXPORT_SYMBOL_GPL(ring_buffer_record_enable);  /** + * ring_buffer_record_off - stop all writes into the buffer + * @buffer: The ring buffer to stop writes to. + * + * This prevents all writes to the buffer. Any attempt to write + * to the buffer after this will fail and return NULL. + * + * This is different than ring_buffer_record_disable() as + * it works like an on/off switch, where as the disable() version + * must be paired with a enable(). + */ +void ring_buffer_record_off(struct ring_buffer *buffer) +{ +	unsigned int rd; +	unsigned int new_rd; + +	do { +		rd = atomic_read(&buffer->record_disabled); +		new_rd = rd | RB_BUFFER_OFF; +	} while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd); +} +EXPORT_SYMBOL_GPL(ring_buffer_record_off); + +/** + * ring_buffer_record_on - restart writes into the buffer + * @buffer: The ring buffer to start writes to. + * + * This enables all writes to the buffer that was disabled by + * ring_buffer_record_off(). + * + * This is different than ring_buffer_record_enable() as + * it works like an on/off switch, where as the enable() version + * must be paired with a disable(). + */ +void ring_buffer_record_on(struct ring_buffer *buffer) +{ +	unsigned int rd; +	unsigned int new_rd; + +	do { +		rd = atomic_read(&buffer->record_disabled); +		new_rd = rd & ~RB_BUFFER_OFF; +	} while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd); +} +EXPORT_SYMBOL_GPL(ring_buffer_record_on); + +/** + * ring_buffer_record_is_on - return true if the ring buffer can write + * @buffer: The ring buffer to see if write is enabled + * + * Returns true if the ring buffer is in a state that it accepts writes. + */ +int ring_buffer_record_is_on(struct ring_buffer *buffer) +{ +	return !atomic_read(&buffer->record_disabled); +} + +/**   * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer   * @buffer: The ring buffer to stop writes to.   * @cpu: The CPU buffer to stop @@ -2629,6 +3152,59 @@ rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer)  }  /** + * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer + * @buffer: The ring buffer + * @cpu: The per CPU buffer to read from. + */ +u64 ring_buffer_oldest_event_ts(struct ring_buffer *buffer, int cpu) +{ +	unsigned long flags; +	struct ring_buffer_per_cpu *cpu_buffer; +	struct buffer_page *bpage; +	u64 ret = 0; + +	if (!cpumask_test_cpu(cpu, buffer->cpumask)) +		return 0; + +	cpu_buffer = buffer->buffers[cpu]; +	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); +	/* +	 * if the tail is on reader_page, oldest time stamp is on the reader +	 * page +	 */ +	if (cpu_buffer->tail_page == cpu_buffer->reader_page) +		bpage = cpu_buffer->reader_page; +	else +		bpage = rb_set_head_page(cpu_buffer); +	if (bpage) +		ret = bpage->page->time_stamp; +	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); + +	return ret; +} +EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); + +/** + * ring_buffer_bytes_cpu - get the number of bytes consumed in a cpu buffer + * @buffer: The ring buffer + * @cpu: The per CPU buffer to read from. + */ +unsigned long ring_buffer_bytes_cpu(struct ring_buffer *buffer, int cpu) +{ +	struct ring_buffer_per_cpu *cpu_buffer; +	unsigned long ret; + +	if (!cpumask_test_cpu(cpu, buffer->cpumask)) +		return 0; + +	cpu_buffer = buffer->buffers[cpu]; +	ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; + +	return ret; +} +EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); + +/**   * ring_buffer_entries_cpu - get the number of entries in a cpu buffer   * @buffer: The ring buffer   * @cpu: The per CPU buffer to get the entries from. @@ -2647,7 +3223,8 @@ unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)  EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu);  /** - * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer + * ring_buffer_overrun_cpu - get the number of overruns caused by the ring + * buffer wrapping around (only if RB_FL_OVERWRITE is on).   * @buffer: The ring buffer   * @cpu: The per CPU buffer to get the number of overruns from   */ @@ -2667,7 +3244,9 @@ unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)  EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu);  /** - * ring_buffer_commit_overrun_cpu - get the number of overruns caused by commits + * ring_buffer_commit_overrun_cpu - get the number of overruns caused by + * commits failing due to the buffer wrapping around while there are uncommitted + * events, such as during an interrupt storm.   * @buffer: The ring buffer   * @cpu: The per CPU buffer to get the number of overruns from   */ @@ -2688,6 +3267,46 @@ ring_buffer_commit_overrun_cpu(struct ring_buffer *buffer, int cpu)  EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu);  /** + * ring_buffer_dropped_events_cpu - get the number of dropped events caused by + * the ring buffer filling up (only if RB_FL_OVERWRITE is off). + * @buffer: The ring buffer + * @cpu: The per CPU buffer to get the number of overruns from + */ +unsigned long +ring_buffer_dropped_events_cpu(struct ring_buffer *buffer, int cpu) +{ +	struct ring_buffer_per_cpu *cpu_buffer; +	unsigned long ret; + +	if (!cpumask_test_cpu(cpu, buffer->cpumask)) +		return 0; + +	cpu_buffer = buffer->buffers[cpu]; +	ret = local_read(&cpu_buffer->dropped_events); + +	return ret; +} +EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); + +/** + * ring_buffer_read_events_cpu - get the number of events successfully read + * @buffer: The ring buffer + * @cpu: The per CPU buffer to get the number of events read + */ +unsigned long +ring_buffer_read_events_cpu(struct ring_buffer *buffer, int cpu) +{ +	struct ring_buffer_per_cpu *cpu_buffer; + +	if (!cpumask_test_cpu(cpu, buffer->cpumask)) +		return 0; + +	cpu_buffer = buffer->buffers[cpu]; +	return cpu_buffer->read; +} +EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); + +/**   * ring_buffer_entries - get the number of entries in a buffer   * @buffer: The ring buffer   * @@ -2772,9 +3391,9 @@ void ring_buffer_iter_reset(struct ring_buffer_iter *iter)  	cpu_buffer = iter->cpu_buffer; -	spin_lock_irqsave(&cpu_buffer->reader_lock, flags); +	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);  	rb_iter_reset(iter); -	spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); +	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);  }  EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); @@ -2895,6 +3514,10 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)  	if (cpu_buffer->commit_page == cpu_buffer->reader_page)  		goto out; +	/* Don't bother swapping if the ring buffer is empty */ +	if (rb_num_of_entries(cpu_buffer) == 0) +		goto out; +  	/*  	 * Reset the reader page to size zero.  	 */ @@ -2908,13 +3531,15 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)  	 * Splice the empty reader page into the list around the head.  	 */  	reader = rb_set_head_page(cpu_buffer); +	if (!reader) +		goto out;  	cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next);  	cpu_buffer->reader_page->list.prev = reader->list.prev;  	/*  	 * cpu_buffer->pages just needs to point to the buffer, it  	 *  has no specific buffer page to point to. Lets move it out -	 *  of our way so we don't accidently swap it. +	 *  of our way so we don't accidentally swap it.  	 */  	cpu_buffer->pages = reader->list.prev; @@ -3040,7 +3665,7 @@ static void rb_advance_iter(struct ring_buffer_iter *iter)  	/* check for end of page padding */  	if ((iter->head >= rb_page_size(iter->head_page)) &&  	    (iter->head_page != cpu_buffer->commit_page)) -		rb_advance_iter(iter); +		rb_inc_iter(iter);  }  static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) @@ -3233,12 +3858,12 @@ ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts,   again:  	local_irq_save(flags);  	if (dolock) -		spin_lock(&cpu_buffer->reader_lock); +		raw_spin_lock(&cpu_buffer->reader_lock);  	event = rb_buffer_peek(cpu_buffer, ts, lost_events);  	if (event && event->type_len == RINGBUF_TYPE_PADDING)  		rb_advance_reader(cpu_buffer);  	if (dolock) -		spin_unlock(&cpu_buffer->reader_lock); +		raw_spin_unlock(&cpu_buffer->reader_lock);  	local_irq_restore(flags);  	if (event && event->type_len == RINGBUF_TYPE_PADDING) @@ -3263,9 +3888,9 @@ ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)  	unsigned long flags;   again: -	spin_lock_irqsave(&cpu_buffer->reader_lock, flags); +	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);  	event = rb_iter_peek(iter, ts); -	spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); +	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);  	if (event && event->type_len == RINGBUF_TYPE_PADDING)  		goto again; @@ -3305,7 +3930,7 @@ ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts,  	cpu_buffer = buffer->buffers[cpu];  	local_irq_save(flags);  	if (dolock) -		spin_lock(&cpu_buffer->reader_lock); +		raw_spin_lock(&cpu_buffer->reader_lock);  	event = rb_buffer_peek(cpu_buffer, ts, lost_events);  	if (event) { @@ -3314,7 +3939,7 @@ ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts,  	}  	if (dolock) -		spin_unlock(&cpu_buffer->reader_lock); +		raw_spin_unlock(&cpu_buffer->reader_lock);  	local_irq_restore(flags);   out: @@ -3341,11 +3966,11 @@ EXPORT_SYMBOL_GPL(ring_buffer_consume);   * expected.   *   * After a sequence of ring_buffer_read_prepare calls, the user is - * expected to make at least one call to ring_buffer_prepare_sync. + * expected to make at least one call to ring_buffer_read_prepare_sync.   * Afterwards, ring_buffer_read_start is invoked to get things going   * for real.   * - * This overall must be paired with ring_buffer_finish. + * This overall must be paired with ring_buffer_read_finish.   */  struct ring_buffer_iter *  ring_buffer_read_prepare(struct ring_buffer *buffer, int cpu) @@ -3364,6 +3989,7 @@ ring_buffer_read_prepare(struct ring_buffer *buffer, int cpu)  	iter->cpu_buffer = cpu_buffer; +	atomic_inc(&buffer->resize_disabled);  	atomic_inc(&cpu_buffer->record_disabled);  	return iter; @@ -3393,7 +4019,7 @@ EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync);   * an intervening ring_buffer_read_prepare_sync must have been   * performed.   * - * Must be paired with ring_buffer_finish. + * Must be paired with ring_buffer_read_finish.   */  void  ring_buffer_read_start(struct ring_buffer_iter *iter) @@ -3406,16 +4032,16 @@ ring_buffer_read_start(struct ring_buffer_iter *iter)  	cpu_buffer = iter->cpu_buffer; -	spin_lock_irqsave(&cpu_buffer->reader_lock, flags); +	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);  	arch_spin_lock(&cpu_buffer->lock);  	rb_iter_reset(iter);  	arch_spin_unlock(&cpu_buffer->lock); -	spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); +	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);  }  EXPORT_SYMBOL_GPL(ring_buffer_read_start);  /** - * ring_buffer_finish - finish reading the iterator of the buffer + * ring_buffer_read_finish - finish reading the iterator of the buffer   * @iter: The iterator retrieved by ring_buffer_start   *   * This re-enables the recording to the buffer, and frees the @@ -3425,8 +4051,20 @@ void  ring_buffer_read_finish(struct ring_buffer_iter *iter)  {  	struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; +	unsigned long flags; + +	/* +	 * Ring buffer is disabled from recording, here's a good place +	 * to check the integrity of the ring buffer. +	 * Must prevent readers from trying to read, as the check +	 * clears the HEAD page and readers require it. +	 */ +	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); +	rb_check_pages(cpu_buffer); +	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);  	atomic_dec(&cpu_buffer->record_disabled); +	atomic_dec(&cpu_buffer->buffer->resize_disabled);  	kfree(iter);  }  EXPORT_SYMBOL_GPL(ring_buffer_read_finish); @@ -3445,7 +4083,7 @@ ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)  	struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;  	unsigned long flags; -	spin_lock_irqsave(&cpu_buffer->reader_lock, flags); +	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);   again:  	event = rb_iter_peek(iter, ts);  	if (!event) @@ -3456,7 +4094,7 @@ ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)  	rb_advance_iter(iter);   out: -	spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); +	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);  	return event;  } @@ -3466,9 +4104,18 @@ EXPORT_SYMBOL_GPL(ring_buffer_read);   * ring_buffer_size - return the size of the ring buffer (in bytes)   * @buffer: The ring buffer.   */ -unsigned long ring_buffer_size(struct ring_buffer *buffer) +unsigned long ring_buffer_size(struct ring_buffer *buffer, int cpu)  { -	return BUF_PAGE_SIZE * buffer->pages; +	/* +	 * Earlier, this method returned +	 *	BUF_PAGE_SIZE * buffer->nr_pages +	 * Since the nr_pages field is now removed, we have converted this to +	 * return the per cpu buffer value. +	 */ +	if (!cpumask_test_cpu(cpu, buffer->cpumask)) +		return 0; + +	return BUF_PAGE_SIZE * buffer->buffers[cpu]->nr_pages;  }  EXPORT_SYMBOL_GPL(ring_buffer_size); @@ -3489,17 +4136,21 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)  	cpu_buffer->commit_page = cpu_buffer->head_page;  	INIT_LIST_HEAD(&cpu_buffer->reader_page->list); +	INIT_LIST_HEAD(&cpu_buffer->new_pages);  	local_set(&cpu_buffer->reader_page->write, 0);  	local_set(&cpu_buffer->reader_page->entries, 0);  	local_set(&cpu_buffer->reader_page->page->commit, 0);  	cpu_buffer->reader_page->read = 0; -	local_set(&cpu_buffer->commit_overrun, 0); +	local_set(&cpu_buffer->entries_bytes, 0);  	local_set(&cpu_buffer->overrun, 0); +	local_set(&cpu_buffer->commit_overrun, 0); +	local_set(&cpu_buffer->dropped_events, 0);  	local_set(&cpu_buffer->entries, 0);  	local_set(&cpu_buffer->committing, 0);  	local_set(&cpu_buffer->commits, 0);  	cpu_buffer->read = 0; +	cpu_buffer->read_bytes = 0;  	cpu_buffer->write_stamp = 0;  	cpu_buffer->read_stamp = 0; @@ -3523,9 +4174,13 @@ void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)  	if (!cpumask_test_cpu(cpu, buffer->cpumask))  		return; +	atomic_inc(&buffer->resize_disabled);  	atomic_inc(&cpu_buffer->record_disabled); -	spin_lock_irqsave(&cpu_buffer->reader_lock, flags); +	/* Make sure all commits have finished */ +	synchronize_sched(); + +	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);  	if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing)))  		goto out; @@ -3537,9 +4192,10 @@ void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)  	arch_spin_unlock(&cpu_buffer->lock);   out: -	spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); +	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);  	atomic_dec(&cpu_buffer->record_disabled); +	atomic_dec(&buffer->resize_disabled);  }  EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); @@ -3575,10 +4231,10 @@ int ring_buffer_empty(struct ring_buffer *buffer)  		cpu_buffer = buffer->buffers[cpu];  		local_irq_save(flags);  		if (dolock) -			spin_lock(&cpu_buffer->reader_lock); +			raw_spin_lock(&cpu_buffer->reader_lock);  		ret = rb_per_cpu_empty(cpu_buffer);  		if (dolock) -			spin_unlock(&cpu_buffer->reader_lock); +			raw_spin_unlock(&cpu_buffer->reader_lock);  		local_irq_restore(flags);  		if (!ret) @@ -3609,10 +4265,10 @@ int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)  	cpu_buffer = buffer->buffers[cpu];  	local_irq_save(flags);  	if (dolock) -		spin_lock(&cpu_buffer->reader_lock); +		raw_spin_lock(&cpu_buffer->reader_lock);  	ret = rb_per_cpu_empty(cpu_buffer);  	if (dolock) -		spin_unlock(&cpu_buffer->reader_lock); +		raw_spin_unlock(&cpu_buffer->reader_lock);  	local_irq_restore(flags);  	return ret; @@ -3641,8 +4297,11 @@ int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,  	    !cpumask_test_cpu(cpu, buffer_b->cpumask))  		goto out; +	cpu_buffer_a = buffer_a->buffers[cpu]; +	cpu_buffer_b = buffer_b->buffers[cpu]; +  	/* At least make sure the two buffers are somewhat the same */ -	if (buffer_a->pages != buffer_b->pages) +	if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)  		goto out;  	ret = -EAGAIN; @@ -3656,9 +4315,6 @@ int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,  	if (atomic_read(&buffer_b->record_disabled))  		goto out; -	cpu_buffer_a = buffer_a->buffers[cpu]; -	cpu_buffer_b = buffer_b->buffers[cpu]; -  	if (atomic_read(&cpu_buffer_a->record_disabled))  		goto out; @@ -3700,6 +4356,7 @@ EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);  /**   * ring_buffer_alloc_read_page - allocate a page to read from buffer   * @buffer: the buffer to allocate for. + * @cpu: the cpu buffer to allocate.   *   * This function is used in conjunction with ring_buffer_read_page.   * When reading a full page from the ring buffer, these functions @@ -3712,16 +4369,17 @@ EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);   * Returns:   *  The page allocated, or NULL on error.   */ -void *ring_buffer_alloc_read_page(struct ring_buffer *buffer) +void *ring_buffer_alloc_read_page(struct ring_buffer *buffer, int cpu)  {  	struct buffer_data_page *bpage; -	unsigned long addr; +	struct page *page; -	addr = __get_free_page(GFP_KERNEL); -	if (!addr) +	page = alloc_pages_node(cpu_to_node(cpu), +				GFP_KERNEL | __GFP_NORETRY, 0); +	if (!page)  		return NULL; -	bpage = (void *)addr; +	bpage = page_address(page);  	rb_init_page(bpage); @@ -3756,7 +4414,7 @@ EXPORT_SYMBOL_GPL(ring_buffer_free_read_page);   * to swap with a page in the ring buffer.   *   * for example: - *	rpage = ring_buffer_alloc_read_page(buffer); + *	rpage = ring_buffer_alloc_read_page(buffer, cpu);   *	if (!rpage)   *		return error;   *	ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0); @@ -3808,7 +4466,7 @@ int ring_buffer_read_page(struct ring_buffer *buffer,  	if (!bpage)  		goto out; -	spin_lock_irqsave(&cpu_buffer->reader_lock, flags); +	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);  	reader = rb_get_reader_page(cpu_buffer);  	if (!reader) @@ -3853,6 +4511,13 @@ int ring_buffer_read_page(struct ring_buffer *buffer,  		/* Need to copy one event at a time */  		do { +			/* We need the size of one event, because +			 * rb_advance_reader only advances by one event, +			 * whereas rb_event_ts_length may include the size of +			 * one or two events. +			 * We have already ensured there's enough space if this +			 * is a time extend. */ +			size = rb_event_length(event);  			memcpy(bpage->data + pos, rpage->data + rpos, size);  			len -= size; @@ -3867,7 +4532,7 @@ int ring_buffer_read_page(struct ring_buffer *buffer,  			event = rb_reader_event(cpu_buffer);  			/* Always keep the time extend and data together */  			size = rb_event_ts_length(event); -		} while (len > size); +		} while (len >= size);  		/* update bpage */  		local_set(&bpage->commit, pos); @@ -3878,6 +4543,7 @@ int ring_buffer_read_page(struct ring_buffer *buffer,  	} else {  		/* update the entry counter */  		cpu_buffer->read += rb_page_entries(reader); +		cpu_buffer->read_bytes += BUF_PAGE_SIZE;  		/* swap the pages */  		rb_init_page(bpage); @@ -3924,84 +4590,13 @@ int ring_buffer_read_page(struct ring_buffer *buffer,  		memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit);   out_unlock: -	spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); +	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);   out:  	return ret;  }  EXPORT_SYMBOL_GPL(ring_buffer_read_page); -#ifdef CONFIG_TRACING -static ssize_t -rb_simple_read(struct file *filp, char __user *ubuf, -	       size_t cnt, loff_t *ppos) -{ -	unsigned long *p = filp->private_data; -	char buf[64]; -	int r; - -	if (test_bit(RB_BUFFERS_DISABLED_BIT, p)) -		r = sprintf(buf, "permanently disabled\n"); -	else -		r = sprintf(buf, "%d\n", test_bit(RB_BUFFERS_ON_BIT, p)); - -	return simple_read_from_buffer(ubuf, cnt, ppos, buf, r); -} - -static ssize_t -rb_simple_write(struct file *filp, const char __user *ubuf, -		size_t cnt, loff_t *ppos) -{ -	unsigned long *p = filp->private_data; -	char buf[64]; -	unsigned long val; -	int ret; - -	if (cnt >= sizeof(buf)) -		return -EINVAL; - -	if (copy_from_user(&buf, ubuf, cnt)) -		return -EFAULT; - -	buf[cnt] = 0; - -	ret = strict_strtoul(buf, 10, &val); -	if (ret < 0) -		return ret; - -	if (val) -		set_bit(RB_BUFFERS_ON_BIT, p); -	else -		clear_bit(RB_BUFFERS_ON_BIT, p); - -	(*ppos)++; - -	return cnt; -} - -static const struct file_operations rb_simple_fops = { -	.open		= tracing_open_generic, -	.read		= rb_simple_read, -	.write		= rb_simple_write, -	.llseek		= default_llseek, -}; - - -static __init int rb_init_debugfs(void) -{ -	struct dentry *d_tracer; - -	d_tracer = tracing_init_dentry(); - -	trace_create_file("tracing_on", 0644, d_tracer, -			    &ring_buffer_flags, &rb_simple_fops); - -	return 0; -} - -fs_initcall(rb_init_debugfs); -#endif -  #ifdef CONFIG_HOTPLUG_CPU  static int rb_cpu_notify(struct notifier_block *self,  			 unsigned long action, void *hcpu) @@ -4009,6 +4604,8 @@ static int rb_cpu_notify(struct notifier_block *self,  	struct ring_buffer *buffer =  		container_of(self, struct ring_buffer, cpu_notify);  	long cpu = (long)hcpu; +	int cpu_i, nr_pages_same; +	unsigned int nr_pages;  	switch (action) {  	case CPU_UP_PREPARE: @@ -4016,8 +4613,23 @@ static int rb_cpu_notify(struct notifier_block *self,  		if (cpumask_test_cpu(cpu, buffer->cpumask))  			return NOTIFY_OK; +		nr_pages = 0; +		nr_pages_same = 1; +		/* check if all cpu sizes are same */ +		for_each_buffer_cpu(buffer, cpu_i) { +			/* fill in the size from first enabled cpu */ +			if (nr_pages == 0) +				nr_pages = buffer->buffers[cpu_i]->nr_pages; +			if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { +				nr_pages_same = 0; +				break; +			} +		} +		/* allocate minimum pages, user can later expand it */ +		if (!nr_pages_same) +			nr_pages = 2;  		buffer->buffers[cpu] = -			rb_allocate_cpu_buffer(buffer, cpu); +			rb_allocate_cpu_buffer(buffer, nr_pages, cpu);  		if (!buffer->buffers[cpu]) {  			WARN(1, "failed to allocate ring buffer on CPU %ld\n",  			     cpu); @@ -4040,3 +4652,320 @@ static int rb_cpu_notify(struct notifier_block *self,  	return NOTIFY_OK;  }  #endif + +#ifdef CONFIG_RING_BUFFER_STARTUP_TEST +/* + * This is a basic integrity check of the ring buffer. + * Late in the boot cycle this test will run when configured in. + * It will kick off a thread per CPU that will go into a loop + * writing to the per cpu ring buffer various sizes of data. + * Some of the data will be large items, some small. + * + * Another thread is created that goes into a spin, sending out + * IPIs to the other CPUs to also write into the ring buffer. + * this is to test the nesting ability of the buffer. + * + * Basic stats are recorded and reported. If something in the + * ring buffer should happen that's not expected, a big warning + * is displayed and all ring buffers are disabled. + */ +static struct task_struct *rb_threads[NR_CPUS] __initdata; + +struct rb_test_data { +	struct ring_buffer	*buffer; +	unsigned long		events; +	unsigned long		bytes_written; +	unsigned long		bytes_alloc; +	unsigned long		bytes_dropped; +	unsigned long		events_nested; +	unsigned long		bytes_written_nested; +	unsigned long		bytes_alloc_nested; +	unsigned long		bytes_dropped_nested; +	int			min_size_nested; +	int			max_size_nested; +	int			max_size; +	int			min_size; +	int			cpu; +	int			cnt; +}; + +static struct rb_test_data rb_data[NR_CPUS] __initdata; + +/* 1 meg per cpu */ +#define RB_TEST_BUFFER_SIZE	1048576 + +static char rb_string[] __initdata = +	"abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" +	"?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" +	"!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; + +static bool rb_test_started __initdata; + +struct rb_item { +	int size; +	char str[]; +}; + +static __init int rb_write_something(struct rb_test_data *data, bool nested) +{ +	struct ring_buffer_event *event; +	struct rb_item *item; +	bool started; +	int event_len; +	int size; +	int len; +	int cnt; + +	/* Have nested writes different that what is written */ +	cnt = data->cnt + (nested ? 27 : 0); + +	/* Multiply cnt by ~e, to make some unique increment */ +	size = (data->cnt * 68 / 25) % (sizeof(rb_string) - 1); + +	len = size + sizeof(struct rb_item); + +	started = rb_test_started; +	/* read rb_test_started before checking buffer enabled */ +	smp_rmb(); + +	event = ring_buffer_lock_reserve(data->buffer, len); +	if (!event) { +		/* Ignore dropped events before test starts. */ +		if (started) { +			if (nested) +				data->bytes_dropped += len; +			else +				data->bytes_dropped_nested += len; +		} +		return len; +	} + +	event_len = ring_buffer_event_length(event); + +	if (RB_WARN_ON(data->buffer, event_len < len)) +		goto out; + +	item = ring_buffer_event_data(event); +	item->size = size; +	memcpy(item->str, rb_string, size); + +	if (nested) { +		data->bytes_alloc_nested += event_len; +		data->bytes_written_nested += len; +		data->events_nested++; +		if (!data->min_size_nested || len < data->min_size_nested) +			data->min_size_nested = len; +		if (len > data->max_size_nested) +			data->max_size_nested = len; +	} else { +		data->bytes_alloc += event_len; +		data->bytes_written += len; +		data->events++; +		if (!data->min_size || len < data->min_size) +			data->max_size = len; +		if (len > data->max_size) +			data->max_size = len; +	} + + out: +	ring_buffer_unlock_commit(data->buffer, event); + +	return 0; +} + +static __init int rb_test(void *arg) +{ +	struct rb_test_data *data = arg; + +	while (!kthread_should_stop()) { +		rb_write_something(data, false); +		data->cnt++; + +		set_current_state(TASK_INTERRUPTIBLE); +		/* Now sleep between a min of 100-300us and a max of 1ms */ +		usleep_range(((data->cnt % 3) + 1) * 100, 1000); +	} + +	return 0; +} + +static __init void rb_ipi(void *ignore) +{ +	struct rb_test_data *data; +	int cpu = smp_processor_id(); + +	data = &rb_data[cpu]; +	rb_write_something(data, true); +} + +static __init int rb_hammer_test(void *arg) +{ +	while (!kthread_should_stop()) { + +		/* Send an IPI to all cpus to write data! */ +		smp_call_function(rb_ipi, NULL, 1); +		/* No sleep, but for non preempt, let others run */ +		schedule(); +	} + +	return 0; +} + +static __init int test_ringbuffer(void) +{ +	struct task_struct *rb_hammer; +	struct ring_buffer *buffer; +	int cpu; +	int ret = 0; + +	pr_info("Running ring buffer tests...\n"); + +	buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); +	if (WARN_ON(!buffer)) +		return 0; + +	/* Disable buffer so that threads can't write to it yet */ +	ring_buffer_record_off(buffer); + +	for_each_online_cpu(cpu) { +		rb_data[cpu].buffer = buffer; +		rb_data[cpu].cpu = cpu; +		rb_data[cpu].cnt = cpu; +		rb_threads[cpu] = kthread_create(rb_test, &rb_data[cpu], +						 "rbtester/%d", cpu); +		if (WARN_ON(!rb_threads[cpu])) { +			pr_cont("FAILED\n"); +			ret = -1; +			goto out_free; +		} + +		kthread_bind(rb_threads[cpu], cpu); + 		wake_up_process(rb_threads[cpu]); +	} + +	/* Now create the rb hammer! */ +	rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); +	if (WARN_ON(!rb_hammer)) { +		pr_cont("FAILED\n"); +		ret = -1; +		goto out_free; +	} + +	ring_buffer_record_on(buffer); +	/* +	 * Show buffer is enabled before setting rb_test_started. +	 * Yes there's a small race window where events could be +	 * dropped and the thread wont catch it. But when a ring +	 * buffer gets enabled, there will always be some kind of +	 * delay before other CPUs see it. Thus, we don't care about +	 * those dropped events. We care about events dropped after +	 * the threads see that the buffer is active. +	 */ +	smp_wmb(); +	rb_test_started = true; + +	set_current_state(TASK_INTERRUPTIBLE); +	/* Just run for 10 seconds */; +	schedule_timeout(10 * HZ); + +	kthread_stop(rb_hammer); + + out_free: +	for_each_online_cpu(cpu) { +		if (!rb_threads[cpu]) +			break; +		kthread_stop(rb_threads[cpu]); +	} +	if (ret) { +		ring_buffer_free(buffer); +		return ret; +	} + +	/* Report! */ +	pr_info("finished\n"); +	for_each_online_cpu(cpu) { +		struct ring_buffer_event *event; +		struct rb_test_data *data = &rb_data[cpu]; +		struct rb_item *item; +		unsigned long total_events; +		unsigned long total_dropped; +		unsigned long total_written; +		unsigned long total_alloc; +		unsigned long total_read = 0; +		unsigned long total_size = 0; +		unsigned long total_len = 0; +		unsigned long total_lost = 0; +		unsigned long lost; +		int big_event_size; +		int small_event_size; + +		ret = -1; + +		total_events = data->events + data->events_nested; +		total_written = data->bytes_written + data->bytes_written_nested; +		total_alloc = data->bytes_alloc + data->bytes_alloc_nested; +		total_dropped = data->bytes_dropped + data->bytes_dropped_nested; + +		big_event_size = data->max_size + data->max_size_nested; +		small_event_size = data->min_size + data->min_size_nested; + +		pr_info("CPU %d:\n", cpu); +		pr_info("              events:    %ld\n", total_events); +		pr_info("       dropped bytes:    %ld\n", total_dropped); +		pr_info("       alloced bytes:    %ld\n", total_alloc); +		pr_info("       written bytes:    %ld\n", total_written); +		pr_info("       biggest event:    %d\n", big_event_size); +		pr_info("      smallest event:    %d\n", small_event_size); + +		if (RB_WARN_ON(buffer, total_dropped)) +			break; + +		ret = 0; + +		while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { +			total_lost += lost; +			item = ring_buffer_event_data(event); +			total_len += ring_buffer_event_length(event); +			total_size += item->size + sizeof(struct rb_item); +			if (memcmp(&item->str[0], rb_string, item->size) != 0) { +				pr_info("FAILED!\n"); +				pr_info("buffer had: %.*s\n", item->size, item->str); +				pr_info("expected:   %.*s\n", item->size, rb_string); +				RB_WARN_ON(buffer, 1); +				ret = -1; +				break; +			} +			total_read++; +		} +		if (ret) +			break; + +		ret = -1; + +		pr_info("         read events:   %ld\n", total_read); +		pr_info("         lost events:   %ld\n", total_lost); +		pr_info("        total events:   %ld\n", total_lost + total_read); +		pr_info("  recorded len bytes:   %ld\n", total_len); +		pr_info(" recorded size bytes:   %ld\n", total_size); +		if (total_lost) +			pr_info(" With dropped events, record len and size may not match\n" +				" alloced and written from above\n"); +		if (!total_lost) { +			if (RB_WARN_ON(buffer, total_len != total_alloc || +				       total_size != total_written)) +				break; +		} +		if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) +			break; + +		ret = 0; +	} +	if (!ret) +		pr_info("Ring buffer PASSED!\n"); + +	ring_buffer_free(buffer); +	return 0; +} + +late_initcall(test_ringbuffer); +#endif /* CONFIG_RING_BUFFER_STARTUP_TEST */  | 
