diff options
Diffstat (limited to 'kernel/trace/ring_buffer.c')
-rw-r--r-- | kernel/trace/ring_buffer.c | 693 |
1 files changed, 529 insertions, 164 deletions
diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c index bd38c5cfd8a..960cbf44c84 100644 --- a/kernel/trace/ring_buffer.c +++ b/kernel/trace/ring_buffer.c @@ -4,21 +4,92 @@ * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> */ #include <linux/ring_buffer.h> +#include <linux/trace_clock.h> +#include <linux/ftrace_irq.h> #include <linux/spinlock.h> #include <linux/debugfs.h> #include <linux/uaccess.h> +#include <linux/hardirq.h> #include <linux/module.h> #include <linux/percpu.h> #include <linux/mutex.h> -#include <linux/sched.h> /* used for sched_clock() (for now) */ #include <linux/init.h> #include <linux/hash.h> #include <linux/list.h> +#include <linux/cpu.h> #include <linux/fs.h> #include "trace.h" /* + * The ring buffer is made up of a list of pages. A separate list of pages is + * allocated for each CPU. A writer may only write to a buffer that is + * associated with the CPU it is currently executing on. A reader may read + * from any per cpu buffer. + * + * The reader is special. For each per cpu buffer, the reader has its own + * reader page. When a reader has read the entire reader page, this reader + * page is swapped with another page in the ring buffer. + * + * Now, as long as the writer is off the reader page, the reader can do what + * ever it wants with that page. The writer will never write to that page + * again (as long as it is out of the ring buffer). + * + * Here's some silly ASCII art. + * + * +------+ + * |reader| RING BUFFER + * |page | + * +------+ +---+ +---+ +---+ + * | |-->| |-->| | + * +---+ +---+ +---+ + * ^ | + * | | + * +---------------+ + * + * + * +------+ + * |reader| RING BUFFER + * |page |------------------v + * +------+ +---+ +---+ +---+ + * | |-->| |-->| | + * +---+ +---+ +---+ + * ^ | + * | | + * +---------------+ + * + * + * +------+ + * |reader| RING BUFFER + * |page |------------------v + * +------+ +---+ +---+ +---+ + * ^ | |-->| |-->| | + * | +---+ +---+ +---+ + * | | + * | | + * +------------------------------+ + * + * + * +------+ + * |buffer| RING BUFFER + * |page |------------------v + * +------+ +---+ +---+ +---+ + * ^ | | | |-->| | + * | New +---+ +---+ +---+ + * | Reader------^ | + * | page | + * +------------------------------+ + * + * + * After we make this swap, the reader can hand this page off to the splice + * code and be done with it. It can even allocate a new page if it needs to + * and swap that into the ring buffer. + * + * We will be using cmpxchg soon to make all this lockless. + * + */ + +/* * A fast way to enable or disable all ring buffers is to * call tracing_on or tracing_off. Turning off the ring buffers * prevents all ring buffers from being recorded to. @@ -57,7 +128,9 @@ enum { RB_BUFFERS_DISABLED = 1 << RB_BUFFERS_DISABLED_BIT, }; -static long ring_buffer_flags __read_mostly = RB_BUFFERS_ON; +static unsigned long ring_buffer_flags __read_mostly = RB_BUFFERS_ON; + +#define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) /** * tracing_on - enable all tracing buffers @@ -89,59 +162,92 @@ EXPORT_SYMBOL_GPL(tracing_off); * tracing_off_permanent - permanently disable ring buffers * * This function, once called, will disable all ring buffers - * permanenty. + * permanently. */ void tracing_off_permanent(void) { set_bit(RB_BUFFERS_DISABLED_BIT, &ring_buffer_flags); } +/** + * tracing_is_on - show state of ring buffers enabled + */ +int tracing_is_on(void) +{ + return ring_buffer_flags == RB_BUFFERS_ON; +} +EXPORT_SYMBOL_GPL(tracing_is_on); + #include "trace.h" -/* Up this if you want to test the TIME_EXTENTS and normalization */ -#define DEBUG_SHIFT 0 +#define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) +#define RB_ALIGNMENT 4U +#define RB_MAX_SMALL_DATA 28 + +enum { + RB_LEN_TIME_EXTEND = 8, + RB_LEN_TIME_STAMP = 16, +}; -/* FIXME!!! */ -u64 ring_buffer_time_stamp(int cpu) +static inline int rb_null_event(struct ring_buffer_event *event) { - u64 time; + return event->type == RINGBUF_TYPE_PADDING && event->time_delta == 0; +} - preempt_disable_notrace(); - /* shift to debug/test normalization and TIME_EXTENTS */ - time = sched_clock() << DEBUG_SHIFT; - preempt_enable_no_resched_notrace(); +static inline int rb_discarded_event(struct ring_buffer_event *event) +{ + return event->type == RINGBUF_TYPE_PADDING && event->time_delta; +} - return time; +static void rb_event_set_padding(struct ring_buffer_event *event) +{ + event->type = RINGBUF_TYPE_PADDING; + event->time_delta = 0; } -EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); -void ring_buffer_normalize_time_stamp(int cpu, u64 *ts) +/** + * ring_buffer_event_discard - discard an event in the ring buffer + * @buffer: the ring buffer + * @event: the event to discard + * + * Sometimes a event that is in the ring buffer needs to be ignored. + * This function lets the user discard an event in the ring buffer + * and then that event will not be read later. + * + * Note, it is up to the user to be careful with this, and protect + * against races. If the user discards an event that has been consumed + * it is possible that it could corrupt the ring buffer. + */ +void ring_buffer_event_discard(struct ring_buffer_event *event) { - /* Just stupid testing the normalize function and deltas */ - *ts >>= DEBUG_SHIFT; + event->type = RINGBUF_TYPE_PADDING; + /* time delta must be non zero */ + if (!event->time_delta) + event->time_delta = 1; } -EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); -#define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event)) -#define RB_ALIGNMENT_SHIFT 2 -#define RB_ALIGNMENT (1 << RB_ALIGNMENT_SHIFT) -#define RB_MAX_SMALL_DATA 28 +static unsigned +rb_event_data_length(struct ring_buffer_event *event) +{ + unsigned length; -enum { - RB_LEN_TIME_EXTEND = 8, - RB_LEN_TIME_STAMP = 16, -}; + if (event->len) + length = event->len * RB_ALIGNMENT; + else + length = event->array[0]; + return length + RB_EVNT_HDR_SIZE; +} /* inline for ring buffer fast paths */ -static inline unsigned +static unsigned rb_event_length(struct ring_buffer_event *event) { - unsigned length; - switch (event->type) { case RINGBUF_TYPE_PADDING: - /* undefined */ - return -1; + if (rb_null_event(event)) + /* undefined */ + return -1; + return rb_event_data_length(event); case RINGBUF_TYPE_TIME_EXTEND: return RB_LEN_TIME_EXTEND; @@ -150,11 +256,7 @@ rb_event_length(struct ring_buffer_event *event) return RB_LEN_TIME_STAMP; case RINGBUF_TYPE_DATA: - if (event->len) - length = event->len << RB_ALIGNMENT_SHIFT; - else - length = event->array[0]; - return length + RB_EVNT_HDR_SIZE; + return rb_event_data_length(event); default: BUG(); } @@ -179,7 +281,7 @@ unsigned ring_buffer_event_length(struct ring_buffer_event *event) EXPORT_SYMBOL_GPL(ring_buffer_event_length); /* inline for ring buffer fast paths */ -static inline void * +static void * rb_event_data(struct ring_buffer_event *event) { BUG_ON(event->type != RINGBUF_TYPE_DATA); @@ -209,7 +311,7 @@ EXPORT_SYMBOL_GPL(ring_buffer_event_data); struct buffer_data_page { u64 time_stamp; /* page time stamp */ - local_t commit; /* write commited index */ + local_t commit; /* write committed index */ unsigned char data[]; /* data of buffer page */ }; @@ -225,14 +327,25 @@ static void rb_init_page(struct buffer_data_page *bpage) local_set(&bpage->commit, 0); } +/** + * ring_buffer_page_len - the size of data on the page. + * @page: The page to read + * + * Returns the amount of data on the page, including buffer page header. + */ +size_t ring_buffer_page_len(void *page) +{ + return local_read(&((struct buffer_data_page *)page)->commit) + + BUF_PAGE_HDR_SIZE; +} + /* * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing * this issue out. */ -static inline void free_buffer_page(struct buffer_page *bpage) +static void free_buffer_page(struct buffer_page *bpage) { - if (bpage->page) - free_page((unsigned long)bpage->page); + free_page((unsigned long)bpage->page); kfree(bpage); } @@ -246,7 +359,7 @@ static inline int test_time_stamp(u64 delta) return 0; } -#define BUF_PAGE_SIZE (PAGE_SIZE - offsetof(struct buffer_data_page, data)) +#define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE) /* * head_page == tail_page && head == tail then buffer is empty. @@ -260,7 +373,7 @@ struct ring_buffer_per_cpu { struct list_head pages; struct buffer_page *head_page; /* read from head */ struct buffer_page *tail_page; /* write to tail */ - struct buffer_page *commit_page; /* commited pages */ + struct buffer_page *commit_page; /* committed pages */ struct buffer_page *reader_page; unsigned long overrun; unsigned long entries; @@ -273,12 +386,17 @@ struct ring_buffer { unsigned pages; unsigned flags; int cpus; - cpumask_var_t cpumask; atomic_t record_disabled; + cpumask_var_t cpumask; struct mutex mutex; struct ring_buffer_per_cpu **buffers; + +#ifdef CONFIG_HOTPLUG_CPU + struct notifier_block cpu_notify; +#endif + u64 (*clock)(void); }; struct ring_buffer_iter { @@ -299,11 +417,35 @@ struct ring_buffer_iter { _____ret; \ }) +/* Up this if you want to test the TIME_EXTENTS and normalization */ +#define DEBUG_SHIFT 0 + +u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu) +{ + u64 time; + + preempt_disable_notrace(); + /* shift to debug/test normalization and TIME_EXTENTS */ + time = buffer->clock() << DEBUG_SHIFT; + preempt_enable_no_resched_notrace(); + + return time; +} +EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); + +void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer, + int cpu, u64 *ts) +{ + /* Just stupid testing the normalize function and deltas */ + *ts >>= DEBUG_SHIFT; +} +EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); + /** * check_pages - integrity check of buffer pages * @cpu_buffer: CPU buffer with pages to test * - * As a safty measure we check to make sure the data pages have not + * As a safety measure we check to make sure the data pages have not * been corrupted. */ static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) @@ -421,7 +563,6 @@ static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) struct list_head *head = &cpu_buffer->pages; struct buffer_page *bpage, *tmp; - list_del_init(&cpu_buffer->reader_page->list); free_buffer_page(cpu_buffer->reader_page); list_for_each_entry_safe(bpage, tmp, head, list) { @@ -437,6 +578,11 @@ static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) */ extern int ring_buffer_page_too_big(void); +#ifdef CONFIG_HOTPLUG_CPU +static int rb_cpu_notify(struct notifier_block *self, + unsigned long action, void *hcpu); +#endif + /** * ring_buffer_alloc - allocate a new ring_buffer * @size: the size in bytes per cpu that is needed. @@ -469,12 +615,23 @@ struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags) buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); buffer->flags = flags; + buffer->clock = trace_clock_local; /* need at least two pages */ if (buffer->pages == 1) buffer->pages++; + /* + * In case of non-hotplug cpu, if the ring-buffer is allocated + * in early initcall, it will not be notified of secondary cpus. + * In that off case, we need to allocate for all possible cpus. + */ +#ifdef CONFIG_HOTPLUG_CPU + get_online_cpus(); + cpumask_copy(buffer->cpumask, cpu_online_mask); +#else cpumask_copy(buffer->cpumask, cpu_possible_mask); +#endif buffer->cpus = nr_cpu_ids; bsize = sizeof(void *) * nr_cpu_ids; @@ -490,6 +647,13 @@ struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags) goto fail_free_buffers; } +#ifdef CONFIG_HOTPLUG_CPU + buffer->cpu_notify.notifier_call = rb_cpu_notify; + buffer->cpu_notify.priority = 0; + register_cpu_notifier(&buffer->cpu_notify); +#endif + + put_online_cpus(); mutex_init(&buffer->mutex); return buffer; @@ -503,6 +667,7 @@ struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags) fail_free_cpumask: free_cpumask_var(buffer->cpumask); + put_online_cpus(); fail_free_buffer: kfree(buffer); @@ -519,15 +684,29 @@ ring_buffer_free(struct ring_buffer *buffer) { int cpu; + get_online_cpus(); + +#ifdef CONFIG_HOTPLUG_CPU + unregister_cpu_notifier(&buffer->cpu_notify); +#endif + for_each_buffer_cpu(buffer, cpu) rb_free_cpu_buffer(buffer->buffers[cpu]); + put_online_cpus(); + free_cpumask_var(buffer->cpumask); kfree(buffer); } EXPORT_SYMBOL_GPL(ring_buffer_free); +void ring_buffer_set_clock(struct ring_buffer *buffer, + u64 (*clock)(void)) +{ + buffer->clock = clock; +} + static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); static void @@ -627,16 +806,15 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size) return size; mutex_lock(&buffer->mutex); + get_online_cpus(); nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); if (size < buffer_size) { /* easy case, just free pages */ - if (RB_WARN_ON(buffer, nr_pages >= buffer->pages)) { - mutex_unlock(&buffer->mutex); - return -1; - } + if (RB_WARN_ON(buffer, nr_pages >= buffer->pages)) + goto out_fail; rm_pages = buffer->pages - nr_pages; @@ -655,10 +833,8 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size) * add these pages to the cpu_buffers. Otherwise we just free * them all and return -ENOMEM; */ - if (RB_WARN_ON(buffer, nr_pages <= buffer->pages)) { - mutex_unlock(&buffer->mutex); - return -1; - } + if (RB_WARN_ON(buffer, nr_pages <= buffer->pages)) + goto out_fail; new_pages = nr_pages - buffer->pages; @@ -683,13 +859,12 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size) rb_insert_pages(cpu_buffer, &pages, new_pages); } - if (RB_WARN_ON(buffer, !list_empty(&pages))) { - mutex_unlock(&buffer->mutex); - return -1; - } + if (RB_WARN_ON(buffer, !list_empty(&pages))) + goto out_fail; out: buffer->pages = nr_pages; + put_online_cpus(); mutex_unlock(&buffer->mutex); return size; @@ -699,15 +874,20 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size) list_del_init(&bpage->list); free_buffer_page(bpage); } + put_online_cpus(); mutex_unlock(&buffer->mutex); return -ENOMEM; -} -EXPORT_SYMBOL_GPL(ring_buffer_resize); -static inline int rb_null_event(struct ring_buffer_event *event) -{ - return event->type == RINGBUF_TYPE_PADDING; + /* + * Something went totally wrong, and we are too paranoid + * to even clean up the mess. + */ + out_fail: + put_online_cpus(); + mutex_unlock(&buffer->mutex); + return -1; } +EXPORT_SYMBOL_GPL(ring_buffer_resize); static inline void * __rb_data_page_index(struct buffer_data_page *bpage, unsigned index) @@ -811,7 +991,7 @@ rb_event_index(struct ring_buffer_event *event) return (addr & ~PAGE_MASK) - (PAGE_SIZE - BUF_PAGE_SIZE); } -static inline int +static int rb_is_commit(struct ring_buffer_per_cpu *cpu_buffer, struct ring_buffer_event *event) { @@ -825,7 +1005,7 @@ rb_is_commit(struct ring_buffer_per_cpu *cpu_buffer, rb_commit_index(cpu_buffer) == index; } -static inline void +static void rb_set_commit_event(struct ring_buffer_per_cpu *cpu_buffer, struct ring_buffer_event *event) { @@ -850,7 +1030,7 @@ rb_set_commit_event(struct ring_buffer_per_cpu *cpu_buffer, local_set(&cpu_buffer->commit_page->page->commit, index); } -static inline void +static void rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) { /* @@ -896,7 +1076,7 @@ static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer) cpu_buffer->reader_page->read = 0; } -static inline void rb_inc_iter(struct ring_buffer_iter *iter) +static void rb_inc_iter(struct ring_buffer_iter *iter) { struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; @@ -926,7 +1106,7 @@ static inline void rb_inc_iter(struct ring_buffer_iter *iter) * and with this, we can determine what to place into the * data field. */ -static inline void +static void rb_update_event(struct ring_buffer_event *event, unsigned type, unsigned length) { @@ -938,15 +1118,11 @@ rb_update_event(struct ring_buffer_event *event, break; case RINGBUF_TYPE_TIME_EXTEND: - event->len = - (RB_LEN_TIME_EXTEND + (RB_ALIGNMENT-1)) - >> RB_ALIGNMENT_SHIFT; + event->len = DIV_ROUND_UP(RB_LEN_TIME_EXTEND, RB_ALIGNMENT); break; case RINGBUF_TYPE_TIME_STAMP: - event->len = - (RB_LEN_TIME_STAMP + (RB_ALIGNMENT-1)) - >> RB_ALIGNMENT_SHIFT; + event->len = DIV_ROUND_UP(RB_LEN_TIME_STAMP, RB_ALIGNMENT); break; case RINGBUF_TYPE_DATA: @@ -955,16 +1131,14 @@ rb_update_event(struct ring_buffer_event *event, event->len = 0; event->array[0] = length; } else - event->len = - (length + (RB_ALIGNMENT-1)) - >> RB_ALIGNMENT_SHIFT; + event->len = DIV_ROUND_UP(length, RB_ALIGNMENT); break; default: BUG(); } } -static inline unsigned rb_calculate_event_length(unsigned length) +static unsigned rb_calculate_event_length(unsigned length) { struct ring_buffer_event event; /* Used only for sizeof array */ @@ -990,6 +1164,7 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, struct ring_buffer *buffer = cpu_buffer->buffer; struct ring_buffer_event *event; unsigned long flags; + bool lock_taken = false; commit_page = cpu_buffer->commit_page; /* we just need to protect against interrupts */ @@ -1003,7 +1178,30 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, struct buffer_page *next_page = tail_page; local_irq_save(flags); - __raw_spin_lock(&cpu_buffer->lock); + /* + * Since the write to the buffer is still not + * fully lockless, we must be careful with NMIs. + * The locks in the writers are taken when a write + * crosses to a new page. The locks protect against + * races with the readers (this will soon be fixed + * with a lockless solution). + * + * Because we can not protect against NMIs, and we + * want to keep traces reentrant, we need to manage + * what happens when we are in an NMI. + * + * NMIs can happen after we take the lock. + * If we are in an NMI, only take the lock + * if it is not already taken. Otherwise + * simply fail. + */ + if (unlikely(in_nmi())) { + if (!__raw_spin_trylock(&cpu_buffer->lock)) + goto out_reset; + } else + __raw_spin_lock(&cpu_buffer->lock); + + lock_taken = true; rb_inc_page(cpu_buffer, &next_page); @@ -1012,7 +1210,7 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, /* we grabbed the lock before incrementing */ if (RB_WARN_ON(cpu_buffer, next_page == reader_page)) - goto out_unlock; + goto out_reset; /* * If for some reason, we had an interrupt storm that made @@ -1021,12 +1219,12 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, */ if (unlikely(next_page == commit_page)) { WARN_ON_ONCE(1); - goto out_unlock; + goto out_reset; } if (next_page == head_page) { if (!(buffer->flags & RB_FL_OVERWRITE)) - goto out_unlock; + goto out_reset; /* tail_page has not moved yet? */ if (tail_page == cpu_buffer->tail_page) { @@ -1050,7 +1248,7 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, cpu_buffer->tail_page = next_page; /* reread the time stamp */ - *ts = ring_buffer_time_stamp(cpu_buffer->cpu); + *ts = ring_buffer_time_stamp(buffer, cpu_buffer->cpu); cpu_buffer->tail_page->page->time_stamp = *ts; } @@ -1060,7 +1258,7 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, if (tail < BUF_PAGE_SIZE) { /* Mark the rest of the page with padding */ event = __rb_page_index(tail_page, tail); - event->type = RINGBUF_TYPE_PADDING; + rb_event_set_padding(event); } if (tail <= BUF_PAGE_SIZE) @@ -1100,12 +1298,13 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, return event; - out_unlock: + out_reset: /* reset write */ if (tail <= BUF_PAGE_SIZE) local_set(&tail_page->write, tail); - __raw_spin_unlock(&cpu_buffer->lock); + if (likely(lock_taken)) + __raw_spin_unlock(&cpu_buffer->lock); local_irq_restore(flags); return NULL; } @@ -1192,7 +1391,7 @@ rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer, if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) return NULL; - ts = ring_buffer_time_stamp(cpu_buffer->cpu); + ts = ring_buffer_time_stamp(cpu_buffer->buffer, cpu_buffer->cpu); /* * Only the first commit can update the timestamp. @@ -1265,7 +1464,6 @@ static DEFINE_PER_CPU(int, rb_need_resched); * ring_buffer_lock_reserve - reserve a part of the buffer * @buffer: the ring buffer to reserve from * @length: the length of the data to reserve (excluding event header) - * @flags: a pointer to save the interrupt flags * * Returns a reseverd event on the ring buffer to copy directly to. * The user of this interface will need to get the body to write into @@ -1278,9 +1476,7 @@ static DEFINE_PER_CPU(int, rb_need_resched); * If NULL is returned, then nothing has been allocated or locked. */ struct ring_buffer_event * -ring_buffer_lock_reserve(struct ring_buffer *buffer, - unsigned long length, - unsigned long *flags) +ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length) { struct ring_buffer_per_cpu *cpu_buffer; struct ring_buffer_event *event; @@ -1347,15 +1543,13 @@ static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, * ring_buffer_unlock_commit - commit a reserved * @buffer: The buffer to commit to * @event: The event pointer to commit. - * @flags: the interrupt flags received from ring_buffer_lock_reserve. * * This commits the data to the ring buffer, and releases any locks held. * * Must be paired with ring_buffer_lock_reserve. */ int ring_buffer_unlock_commit(struct ring_buffer *buffer, - struct ring_buffer_event *event, - unsigned long flags) + struct ring_buffer_event *event) { struct ring_buffer_per_cpu *cpu_buffer; int cpu = raw_smp_processor_id(); @@ -1438,7 +1632,7 @@ int ring_buffer_write(struct ring_buffer *buffer, } EXPORT_SYMBOL_GPL(ring_buffer_write); -static inline int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) +static int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) { struct buffer_page *reader = cpu_buffer->reader_page; struct buffer_page *head = cpu_buffer->head_page; @@ -1528,12 +1722,15 @@ EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu) { struct ring_buffer_per_cpu *cpu_buffer; + unsigned long ret; if (!cpumask_test_cpu(cpu, buffer->cpumask)) return 0; cpu_buffer = buffer->buffers[cpu]; - return cpu_buffer->entries; + ret = cpu_buffer->entries; + + return ret; } EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); @@ -1545,12 +1742,15 @@ EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu) { struct ring_buffer_per_cpu *cpu_buffer; + unsigned long ret; if (!cpumask_test_cpu(cpu, buffer->cpumask)) return 0; cpu_buffer = buffer->buffers[cpu]; - return cpu_buffer->overrun; + ret = cpu_buffer->overrun; + + return ret; } EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); @@ -1627,9 +1827,14 @@ static void rb_iter_reset(struct ring_buffer_iter *iter) */ void ring_buffer_iter_reset(struct ring_buffer_iter *iter) { - struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; + struct ring_buffer_per_cpu *cpu_buffer; unsigned long flags; + if (!iter) + return; + + cpu_buffer = iter->cpu_buffer; + spin_lock_irqsave(&cpu_buffer->reader_lock, flags); rb_iter_reset(iter); spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); @@ -1803,7 +2008,7 @@ static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) event = rb_reader_event(cpu_buffer); - if (event->type == RINGBUF_TYPE_DATA) + if (event->type == RINGBUF_TYPE_DATA || rb_discarded_event(event)) cpu_buffer->entries--; rb_update_read_stamp(cpu_buffer, event); @@ -1864,9 +2069,6 @@ rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) struct buffer_page *reader; int nr_loops = 0; - if (!cpumask_test_cpu(cpu, buffer->cpumask)) - return NULL; - cpu_buffer = buffer->buffers[cpu]; again: @@ -1889,9 +2091,18 @@ rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) switch (event->type) { case RINGBUF_TYPE_PADDING: - RB_WARN_ON(cpu_buffer, 1); + if (rb_null_event(event)) + RB_WARN_ON(cpu_buffer, 1); + /* + * Because the writer could be discarding every + * event it creates (which would probably be bad) + * if we were to go back to "again" then we may never + * catch up, and will trigger the warn on, or lock + * the box. Return the padding, and we will release + * the current locks, and try again. + */ rb_advance_reader(cpu_buffer); - return NULL; + return event; case RINGBUF_TYPE_TIME_EXTEND: /* Internal data, OK to advance */ @@ -1906,7 +2117,8 @@ rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) case RINGBUF_TYPE_DATA: if (ts) { *ts = cpu_buffer->read_stamp + event->time_delta; - ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts); + ring_buffer_normalize_time_stamp(buffer, + cpu_buffer->cpu, ts); } return event; @@ -1951,8 +2163,12 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) switch (event->type) { case RINGBUF_TYPE_PADDING: - rb_inc_iter(iter); - goto again; + if (rb_null_event(event)) { + rb_inc_iter(iter); + goto again; + } + rb_advance_iter(iter); + return event; case RINGBUF_TYPE_TIME_EXTEND: /* Internal data, OK to advance */ @@ -1967,7 +2183,8 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) case RINGBUF_TYPE_DATA: if (ts) { *ts = iter->read_stamp + event->time_delta; - ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts); + ring_buffer_normalize_time_stamp(buffer, + cpu_buffer->cpu, ts); } return event; @@ -1995,10 +2212,19 @@ ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) struct ring_buffer_event *event; unsigned long flags; + if (!cpumask_test_cpu(cpu, buffer->cpumask)) + return NULL; + + again: spin_lock_irqsave(&cpu_buffer->reader_lock, flags); event = rb_buffer_peek(buffer, cpu, ts); spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); + if (event && event->type == RINGBUF_TYPE_PADDING) { + cpu_relax(); + goto again; + } + return event; } @@ -2017,10 +2243,16 @@ ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) struct ring_buffer_event *event; unsigned long flags; + again: spin_lock_irqsave(&cpu_buffer->reader_lock, flags); event = rb_iter_peek(iter, ts); spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); + if (event && event->type == RINGBUF_TYPE_PADDING) { + cpu_relax(); + goto again; + } + return event; } @@ -2035,24 +2267,37 @@ ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) struct ring_buffer_event * ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts) { - struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; - struct ring_buffer_event *event; + struct ring_buffer_per_cpu *cpu_buffer; + struct ring_buffer_event *event = NULL; unsigned long flags; + again: + /* might be called in atomic */ + preempt_disable(); + if (!cpumask_test_cpu(cpu, buffer->cpumask)) - return NULL; + goto out; + cpu_buffer = buffer->buffers[cpu]; spin_lock_irqsave(&cpu_buffer->reader_lock, flags); event = rb_buffer_peek(buffer, cpu, ts); if (!event) - goto out; + goto out_unlock; rb_advance_reader(cpu_buffer); - out: + out_unlock: spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); + out: + preempt_enable(); + + if (event && event->type == RINGBUF_TYPE_PADDING) { + cpu_relax(); + goto again; + } + return event; } EXPORT_SYMBOL_GPL(ring_buffer_consume); @@ -2131,6 +2376,7 @@ ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts) struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; unsigned long flags; + again: spin_lock_irqsave(&cpu_buffer->reader_lock, flags); event = rb_iter_peek(iter, ts); if (!event) @@ -2140,6 +2386,11 @@ ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts) out: spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); + if (event && event->type == RINGBUF_TYPE_PADDING) { + cpu_relax(); + goto again; + } + return event; } EXPORT_SYMBOL_GPL(ring_buffer_read); @@ -2232,6 +2483,7 @@ int ring_buffer_empty(struct ring_buffer *buffer) if (!rb_per_cpu_empty(cpu_buffer)) return 0; } + return 1; } EXPORT_SYMBOL_GPL(ring_buffer_empty); @@ -2244,12 +2496,16 @@ EXPORT_SYMBOL_GPL(ring_buffer_empty); int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu) { struct ring_buffer_per_cpu *cpu_buffer; + int ret; if (!cpumask_test_cpu(cpu, buffer->cpumask)) return 1; cpu_buffer = buffer->buffers[cpu]; - return rb_per_cpu_empty(cpu_buffer); + ret = rb_per_cpu_empty(cpu_buffer); + + + return ret; } EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); @@ -2268,18 +2524,36 @@ int ring_buffer_swap_cpu(struct ring_buffer *buffer_a, { struct ring_buffer_per_cpu *cpu_buffer_a; struct ring_buffer_per_cpu *cpu_buffer_b; + int ret = -EINVAL; if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || !cpumask_test_cpu(cpu, buffer_b->cpumask)) - return -EINVAL; + goto out; /* At least make sure the two buffers are somewhat the same */ if (buffer_a->pages != buffer_b->pages) - return -EINVAL; + goto out; + + ret = -EAGAIN; + + if (ring_buffer_flags != RB_BUFFERS_ON) + goto out; + + if (atomic_read(&buffer_a->record_disabled)) + goto out; + + if (atomic_read(&buffer_b->record_disabled)) + goto out; cpu_buffer_a = buffer_a->buffers[cpu]; cpu_buffer_b = buffer_b->buffers[cpu]; + if (atomic_read(&cpu_buffer_a->record_disabled)) + goto out; + + if (atomic_read(&cpu_buffer_b->record_disabled)) + goto out; + /* * We can't do a synchronize_sched here because this * function can be called in atomic context. @@ -2298,18 +2572,21 @@ int ring_buffer_swap_cpu(struct ring_buffer *buffer_a, atomic_dec(&cpu_buffer_a->record_disabled); atomic_dec(&cpu_buffer_b->record_disabled); - return 0; + ret = 0; +out: + return ret; } EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); static void rb_remove_entries(struct ring_buffer_per_cpu *cpu_buffer, - struct buffer_data_page *bpage) + struct buffer_data_page *bpage, + unsigned int offset) { struct ring_buffer_event *event; unsigned long head; __raw_spin_lock(&cpu_buffer->lock); - for (head = 0; head < local_read(&bpage->commit); + for (head = offset; head < local_read(&bpage->commit); head += rb_event_length(event)) { event = __rb_data_page_index(bpage, head); @@ -2340,8 +2617,8 @@ static void rb_remove_entries(struct ring_buffer_per_cpu *cpu_buffer, */ void *ring_buffer_alloc_read_page(struct ring_buffer *buffer) { - unsigned long addr; struct buffer_data_page *bpage; + unsigned long addr; addr = __get_free_page(GFP_KERNEL); if (!addr) @@ -2349,6 +2626,8 @@ void *ring_buffer_alloc_read_page(struct ring_buffer *buffer) bpage = (void *)addr; + rb_init_page(bpage); + return bpage; } @@ -2368,6 +2647,7 @@ void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data) * ring_buffer_read_page - extract a page from the ring buffer * @buffer: buffer to extract from * @data_page: the page to use allocated from ring_buffer_alloc_read_page + * @len: amount to extract * @cpu: the cpu of the buffer to extract * @full: should the extraction only happen when the page is full. * @@ -2377,12 +2657,12 @@ void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data) * to swap with a page in the ring buffer. * * for example: - * rpage = ring_buffer_alloc_page(buffer); + * rpage = ring_buffer_alloc_read_page(buffer); * if (!rpage) * return error; - * ret = ring_buffer_read_page(buffer, &rpage, cpu, 0); - * if (ret) - * process_page(rpage); + * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0); + * if (ret >= 0) + * process_page(rpage, ret); * * When @full is set, the function will not return true unless * the writer is off the reader page. @@ -2393,72 +2673,118 @@ void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data) * responsible for that. * * Returns: - * 1 if data has been transferred - * 0 if no data has been transferred. + * >=0 if data has been transferred, returns the offset of consumed data. + * <0 if no data has been transferred. */ int ring_buffer_read_page(struct ring_buffer *buffer, - void **data_page, int cpu, int full) + void **data_page, size_t len, int cpu, int full) { struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; struct ring_buffer_event *event; struct buffer_data_page *bpage; + struct buffer_page *reader; unsigned long flags; - int ret = 0; + unsigned int commit; |