diff options
Diffstat (limited to 'fs/aio.c')
-rw-r--r-- | fs/aio.c | 106 |
1 files changed, 62 insertions, 44 deletions
@@ -97,6 +97,8 @@ struct kioctx { struct aio_ring_info ring_info; + spinlock_t completion_lock; + struct rcu_head rcu_head; struct work_struct rcu_work; }; @@ -220,25 +222,51 @@ static int aio_setup_ring(struct kioctx *ctx) #define AIO_EVENTS_FIRST_PAGE ((PAGE_SIZE - sizeof(struct aio_ring)) / sizeof(struct io_event)) #define AIO_EVENTS_OFFSET (AIO_EVENTS_PER_PAGE - AIO_EVENTS_FIRST_PAGE) +void kiocb_set_cancel_fn(struct kiocb *req, kiocb_cancel_fn *cancel) +{ + struct kioctx *ctx = req->ki_ctx; + unsigned long flags; + + spin_lock_irqsave(&ctx->ctx_lock, flags); + + if (!req->ki_list.next) + list_add(&req->ki_list, &ctx->active_reqs); + + req->ki_cancel = cancel; + + spin_unlock_irqrestore(&ctx->ctx_lock, flags); +} +EXPORT_SYMBOL(kiocb_set_cancel_fn); + static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb, struct io_event *res) { - int (*cancel)(struct kiocb *, struct io_event *); + kiocb_cancel_fn *old, *cancel; int ret = -EINVAL; - cancel = kiocb->ki_cancel; - kiocbSetCancelled(kiocb); - if (cancel) { - atomic_inc(&kiocb->ki_users); - spin_unlock_irq(&ctx->ctx_lock); + /* + * Don't want to set kiocb->ki_cancel = KIOCB_CANCELLED unless it + * actually has a cancel function, hence the cmpxchg() + */ + + cancel = ACCESS_ONCE(kiocb->ki_cancel); + do { + if (!cancel || cancel == KIOCB_CANCELLED) + return ret; - memset(res, 0, sizeof(*res)); - res->obj = (u64)(unsigned long)kiocb->ki_obj.user; - res->data = kiocb->ki_user_data; - ret = cancel(kiocb, res); + old = cancel; + cancel = cmpxchg(&kiocb->ki_cancel, old, KIOCB_CANCELLED); + } while (cancel != old); - spin_lock_irq(&ctx->ctx_lock); - } + atomic_inc(&kiocb->ki_users); + spin_unlock_irq(&ctx->ctx_lock); + + memset(res, 0, sizeof(*res)); + res->obj = (u64)(unsigned long)kiocb->ki_obj.user; + res->data = kiocb->ki_user_data; + ret = cancel(kiocb, res); + + spin_lock_irq(&ctx->ctx_lock); return ret; } @@ -326,6 +354,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events) atomic_set(&ctx->users, 2); atomic_set(&ctx->dead, 0); spin_lock_init(&ctx->ctx_lock); + spin_lock_init(&ctx->completion_lock); mutex_init(&ctx->ring_info.ring_lock); init_waitqueue_head(&ctx->wait); @@ -468,20 +497,12 @@ static struct kiocb *__aio_get_req(struct kioctx *ctx) { struct kiocb *req = NULL; - req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL); + req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO); if (unlikely(!req)) return NULL; - req->ki_flags = 0; atomic_set(&req->ki_users, 2); - req->ki_key = 0; req->ki_ctx = ctx; - req->ki_cancel = NULL; - req->ki_retry = NULL; - req->ki_dtor = NULL; - req->private = NULL; - req->ki_iovec = NULL; - req->ki_eventfd = NULL; return req; } @@ -512,7 +533,6 @@ static void kiocb_batch_free(struct kioctx *ctx, struct kiocb_batch *batch) spin_lock_irq(&ctx->ctx_lock); list_for_each_entry_safe(req, n, &batch->head, ki_batch) { list_del(&req->ki_batch); - list_del(&req->ki_list); kmem_cache_free(kiocb_cachep, req); atomic_dec(&ctx->reqs_active); } @@ -559,10 +579,7 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch) } batch->count -= allocated; - list_for_each_entry(req, &batch->head, ki_batch) { - list_add(&req->ki_list, &ctx->active_reqs); - atomic_inc(&ctx->reqs_active); - } + atomic_add(allocated, &ctx->reqs_active); kunmap_atomic(ring); spin_unlock_irq(&ctx->ctx_lock); @@ -653,25 +670,34 @@ void aio_complete(struct kiocb *iocb, long res, long res2) info = &ctx->ring_info; /* - * Add a completion event to the ring buffer. Must be done holding - * ctx->ctx_lock to prevent other code from messing with the tail - * pointer since we might be called from irq context. - * * Take rcu_read_lock() in case the kioctx is being destroyed, as we * need to issue a wakeup after decrementing reqs_active. */ rcu_read_lock(); - spin_lock_irqsave(&ctx->ctx_lock, flags); - list_del(&iocb->ki_list); /* remove from active_reqs */ + if (iocb->ki_list.next) { + unsigned long flags; + + spin_lock_irqsave(&ctx->ctx_lock, flags); + list_del(&iocb->ki_list); + spin_unlock_irqrestore(&ctx->ctx_lock, flags); + } /* * cancelled requests don't get events, userland was given one * when the event got cancelled. */ - if (kiocbIsCancelled(iocb)) + if (unlikely(xchg(&iocb->ki_cancel, + KIOCB_CANCELLED) == KIOCB_CANCELLED)) goto put_rq; + /* + * Add a completion event to the ring buffer. Must be done holding + * ctx->ctx_lock to prevent other code from messing with the tail + * pointer since we might be called from irq context. + */ + spin_lock_irqsave(&ctx->completion_lock, flags); + tail = info->tail; pos = tail + AIO_EVENTS_OFFSET; @@ -705,6 +731,8 @@ void aio_complete(struct kiocb *iocb, long res, long res2) kunmap_atomic(ring); flush_dcache_page(info->ring_pages[0]); + spin_unlock_irqrestore(&ctx->completion_lock, flags); + pr_debug("added to ring %p at [%u]\n", iocb, tail); /* @@ -731,7 +759,6 @@ put_rq: if (waitqueue_active(&ctx->wait)) wake_up(&ctx->wait); - spin_unlock_irqrestore(&ctx->ctx_lock, flags); rcu_read_unlock(); } EXPORT_SYMBOL(aio_complete); @@ -1216,15 +1243,10 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, req->ki_opcode = iocb->aio_lio_opcode; ret = aio_setup_iocb(req, compat); - if (ret) goto out_put_req; - if (unlikely(kiocbIsCancelled(req))) - ret = -EINTR; - else - ret = req->ki_retry(req); - + ret = req->ki_retry(req); if (ret != -EIOCBQUEUED) { /* * There's no easy way to restart the syscall since other AIO's @@ -1241,10 +1263,6 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, return 0; out_put_req: - spin_lock_irq(&ctx->ctx_lock); - list_del(&req->ki_list); - spin_unlock_irq(&ctx->ctx_lock); - atomic_dec(&ctx->reqs_active); aio_put_req(req); /* drop extra ref to req */ aio_put_req(req); /* drop i/o ref to req */ |