#include <linux/slab.h>
#include <linux/highmem.h>
#include <linux/pagemap.h>
+#include <linux/task_io_accounting_ops.h>
#include <linux/bio.h>
#include <linux/wait.h>
#include <linux/err.h>
int page_errors; /* errno from get_user_pages() */
/* BIO completion state */
+ atomic_t refcount; /* direct_io_worker() and bios */
spinlock_t bio_lock; /* protects BIO fields below */
- int bio_count; /* nr bios to be completed */
- int bios_in_flight; /* nr bios in flight */
struct bio *bio_list; /* singly linked via bi_private */
struct task_struct *waiter; /* waiting task (NULL if none) */
NULL); /* vmas */
up_read(¤t->mm->mmap_sem);
- if (ret < 0 && dio->blocks_available && (dio->rw == WRITE)) {
+ if (ret < 0 && dio->blocks_available && (dio->rw & WRITE)) {
struct page *page = ZERO_PAGE(dio->curr_user_address);
/*
* A memory fault, but the filesystem has some outstanding
return dio->pages[dio->head++];
}
-/*
- * Called when all DIO BIO I/O has been completed - let the filesystem
- * know, if it registered an interest earlier via get_block. Pass the
- * private field of the map buffer_head so that filesystems can use it
- * to hold additional state between get_block calls and dio_complete.
+/**
+ * dio_complete() - called when all DIO BIO I/O has been completed
+ * @offset: the byte offset in the file of the completed operation
+ *
+ * This releases locks as dictated by the locking type, lets interested parties
+ * know that a DIO operation has completed, and calculates the resulting return
+ * code for the operation.
+ *
+ * It lets the filesystem know if it registered an interest earlier via
+ * get_block. Pass the private field of the map buffer_head so that
+ * filesystems can use it to hold additional state between get_block calls and
+ * dio_complete.
*/
-static void dio_complete(struct dio *dio, loff_t offset, ssize_t bytes)
+static int dio_complete(struct dio *dio, loff_t offset, int ret)
{
+ ssize_t transferred = 0;
+
+ if (dio->result) {
+ transferred = dio->result;
+
+ /* Check for short read case */
+ if ((dio->rw == READ) && ((offset + transferred) > dio->i_size))
+ transferred = dio->i_size - offset;
+ }
+
if (dio->end_io && dio->result)
- dio->end_io(dio->iocb, offset, bytes, dio->map_bh.b_private);
+ dio->end_io(dio->iocb, offset, transferred,
+ dio->map_bh.b_private);
if (dio->lock_type == DIO_LOCKING)
- up_read(&dio->inode->i_alloc_sem);
+ /* lockdep: non-owner release */
+ up_read_non_owner(&dio->inode->i_alloc_sem);
+
+ if (ret == 0)
+ ret = dio->page_errors;
+ if (ret == 0)
+ ret = dio->io_error;
+ if (ret == 0)
+ ret = transferred;
+
+ return ret;
}
/*
* Called when a BIO has been processed. If the count goes to zero then IO is
* complete and we can signal this to the AIO layer.
*/
-static void finished_one_bio(struct dio *dio)
+static void dio_complete_aio(struct dio *dio)
{
unsigned long flags;
+ int ret;
- spin_lock_irqsave(&dio->bio_lock, flags);
- if (dio->bio_count == 1) {
- if (dio->is_async) {
- ssize_t transferred;
- loff_t offset;
-
- /*
- * Last reference to the dio is going away.
- * Drop spinlock and complete the DIO.
- */
- spin_unlock_irqrestore(&dio->bio_lock, flags);
-
- /* Check for short read case */
- transferred = dio->result;
- offset = dio->iocb->ki_pos;
-
- if ((dio->rw == READ) &&
- ((offset + transferred) > dio->i_size))
- transferred = dio->i_size - offset;
-
- /* check for error in completion path */
- if (dio->io_error)
- transferred = dio->io_error;
-
- dio_complete(dio, offset, transferred);
+ ret = dio_complete(dio, dio->iocb->ki_pos, 0);
- /* Complete AIO later if falling back to buffered i/o */
- if (dio->result == dio->size ||
- ((dio->rw == READ) && dio->result)) {
- aio_complete(dio->iocb, transferred, 0);
- kfree(dio);
- return;
- } else {
- /*
- * Falling back to buffered
- */
- spin_lock_irqsave(&dio->bio_lock, flags);
- dio->bio_count--;
- if (dio->waiter)
- wake_up_process(dio->waiter);
- spin_unlock_irqrestore(&dio->bio_lock, flags);
- return;
- }
- }
+ /* Complete AIO later if falling back to buffered i/o */
+ if (dio->result == dio->size ||
+ ((dio->rw == READ) && dio->result)) {
+ aio_complete(dio->iocb, ret, 0);
+ kfree(dio);
+ } else {
+ /*
+ * Falling back to buffered
+ */
+ spin_lock_irqsave(&dio->bio_lock, flags);
+ if (dio->waiter)
+ wake_up_process(dio->waiter);
+ spin_unlock_irqrestore(&dio->bio_lock, flags);
}
- dio->bio_count--;
- spin_unlock_irqrestore(&dio->bio_lock, flags);
}
static int dio_bio_complete(struct dio *dio, struct bio *bio);
/* cleanup the bio */
dio_bio_complete(dio, bio);
+
+ if (atomic_dec_and_test(&dio->refcount))
+ dio_complete_aio(dio);
+
return 0;
}
spin_lock_irqsave(&dio->bio_lock, flags);
bio->bi_private = dio->bio_list;
dio->bio_list = bio;
- dio->bios_in_flight--;
- if (dio->waiter && dio->bios_in_flight == 0)
+ if ((atomic_sub_return(1, &dio->refcount) == 1) && dio->waiter)
wake_up_process(dio->waiter);
spin_unlock_irqrestore(&dio->bio_lock, flags);
return 0;
* In the AIO read case we speculatively dirty the pages before starting IO.
* During IO completion, any of these pages which happen to have been written
* back will be redirtied by bio_check_pages_dirty().
+ *
+ * bios hold a dio reference between submit_bio and ->end_io.
*/
static void dio_bio_submit(struct dio *dio)
{
struct bio *bio = dio->bio;
- unsigned long flags;
bio->bi_private = dio;
- spin_lock_irqsave(&dio->bio_lock, flags);
- dio->bio_count++;
- dio->bios_in_flight++;
- spin_unlock_irqrestore(&dio->bio_lock, flags);
+ atomic_inc(&dio->refcount);
if (dio->is_async && dio->rw == READ)
bio_set_pages_dirty(bio);
submit_bio(dio->rw, bio);
page_cache_release(dio_get_page(dio));
}
+static int wait_for_more_bios(struct dio *dio)
+{
+ assert_spin_locked(&dio->bio_lock);
+
+ return (atomic_read(&dio->refcount) > 1) && (dio->bio_list == NULL);
+}
+
/*
- * Wait for the next BIO to complete. Remove it and return it.
+ * Wait for the next BIO to complete. Remove it and return it. NULL is
+ * returned once all BIOs have been completed. This must only be called once
+ * all bios have been issued so that dio->refcount can only decrease. This
+ * requires that that the caller hold a reference on the dio.
*/
static struct bio *dio_await_one(struct dio *dio)
{
unsigned long flags;
- struct bio *bio;
+ struct bio *bio = NULL;
spin_lock_irqsave(&dio->bio_lock, flags);
- while (dio->bio_list == NULL) {
+ while (wait_for_more_bios(dio)) {
set_current_state(TASK_UNINTERRUPTIBLE);
- if (dio->bio_list == NULL) {
+ if (wait_for_more_bios(dio)) {
dio->waiter = current;
spin_unlock_irqrestore(&dio->bio_lock, flags);
- blk_run_address_space(dio->inode->i_mapping);
io_schedule();
spin_lock_irqsave(&dio->bio_lock, flags);
dio->waiter = NULL;
}
set_current_state(TASK_RUNNING);
}
- bio = dio->bio_list;
- dio->bio_list = bio->bi_private;
+ if (dio->bio_list) {
+ bio = dio->bio_list;
+ dio->bio_list = bio->bi_private;
+ }
spin_unlock_irqrestore(&dio->bio_lock, flags);
return bio;
}
}
bio_put(bio);
}
- finished_one_bio(dio);
return uptodate ? 0 : -EIO;
}
/*
- * Wait on and process all in-flight BIOs.
+ * Wait on and process all in-flight BIOs. This must only be called once
+ * all bios have been issued so that the refcount can only decrease.
+ * This just waits for all bios to make it through dio_bio_complete. IO
+ * errors are propogated through dio->io_error and should be propogated via
+ * dio_complete().
*/
-static int dio_await_completion(struct dio *dio)
+static void dio_await_completion(struct dio *dio)
{
- int ret = 0;
-
- if (dio->bio)
- dio_bio_submit(dio);
-
- /*
- * The bio_lock is not held for the read of bio_count.
- * This is ok since it is the dio_bio_complete() that changes
- * bio_count.
- */
- while (dio->bio_count) {
- struct bio *bio = dio_await_one(dio);
- int ret2;
-
- ret2 = dio_bio_complete(dio, bio);
- if (ret == 0)
- ret = ret2;
- }
- return ret;
+ struct bio *bio;
+ do {
+ bio = dio_await_one(dio);
+ if (bio)
+ dio_bio_complete(dio, bio);
+ } while (bio);
}
/*
map_bh->b_state = 0;
map_bh->b_size = fs_count << dio->inode->i_blkbits;
- create = dio->rw == WRITE;
+ create = dio->rw & WRITE;
if (dio->lock_type == DIO_LOCKING) {
if (dio->block_in_file < (i_size_read(dio->inode) >>
dio->blkbits))
{
int ret = 0;
+ if (dio->rw & WRITE) {
+ /*
+ * Read accounting is performed in submit_bio()
+ */
+ task_io_account_write(len);
+ }
+
/*
* Can we just grow the current page's presence in the dio?
*/
loff_t i_size_aligned;
/* AKPM: eargh, -ENOTBLK is a hack */
- if (dio->rw == WRITE) {
+ if (dio->rw & WRITE) {
page_cache_release(page);
return -ENOTBLK;
}
dio->iocb = iocb;
dio->i_size = i_size_read(inode);
- /*
- * BIO completion state.
- *
- * ->bio_count starts out at one, and we decrement it to zero after all
- * BIOs are submitted. This to avoid the situation where a really fast
- * (or synchronous) device could take the count to zero while we're
- * still submitting BIOs.
- */
- dio->bio_count = 1;
- dio->bios_in_flight = 0;
+ atomic_set(&dio->refcount, 1);
spin_lock_init(&dio->bio_lock);
dio->bio_list = NULL;
dio->waiter = NULL;
}
} /* end iovec loop */
- if (ret == -ENOTBLK && rw == WRITE) {
+ if (ret == -ENOTBLK && (rw & WRITE)) {
/*
* The remaining part of the request will be
* be handled by buffered I/O when we return
if (dio->bio)
dio_bio_submit(dio);
+ /* All IO is now issued, send it on its way */
+ blk_run_address_space(inode->i_mapping);
+
/*
* It is possible that, we return short IO due to end of file.
* In that case, we need to release all the pages we got hold on.
if (dio->is_async) {
int should_wait = 0;
- if (dio->result < dio->size && rw == WRITE) {
+ if (dio->result < dio->size && (rw & WRITE)) {
dio->waiter = current;
should_wait = 1;
}
if (ret == 0)
ret = dio->result;
- finished_one_bio(dio); /* This can free the dio */
- blk_run_address_space(inode->i_mapping);
+
+ /* this can free the dio */
+ if (atomic_dec_and_test(&dio->refcount))
+ dio_complete_aio(dio);
+
if (should_wait) {
unsigned long flags;
/*
spin_lock_irqsave(&dio->bio_lock, flags);
set_current_state(TASK_UNINTERRUPTIBLE);
- while (dio->bio_count) {
+ while (atomic_read(&dio->refcount)) {
spin_unlock_irqrestore(&dio->bio_lock, flags);
io_schedule();
spin_lock_irqsave(&dio->bio_lock, flags);
kfree(dio);
}
} else {
- ssize_t transferred = 0;
+ dio_await_completion(dio);
- finished_one_bio(dio);
- ret2 = dio_await_completion(dio);
- if (ret == 0)
- ret = ret2;
- if (ret == 0)
- ret = dio->page_errors;
- if (dio->result) {
- loff_t i_size = i_size_read(inode);
-
- transferred = dio->result;
- /*
- * Adjust the return value if the read crossed a
- * non-block-aligned EOF.
- */
- if (rw == READ && (offset + transferred > i_size))
- transferred = i_size - offset;
- }
- dio_complete(dio, offset, transferred);
- if (ret == 0)
- ret = transferred;
+ ret = dio_complete(dio, offset, ret);
/* We could have also come here on an AIO file extend */
- if (!is_sync_kiocb(iocb) && rw == WRITE &&
+ if (!is_sync_kiocb(iocb) && (rw & WRITE) &&
ret >= 0 && dio->result == dio->size)
/*
* For AIO writes where we have completed the
* i/o, we have to mark the the aio complete.
*/
aio_complete(iocb, ret, 0);
- kfree(dio);
+
+ if (atomic_dec_and_test(&dio->refcount))
+ kfree(dio);
+ else
+ BUG();
}
return ret;
}
int acquire_i_mutex = 0;
if (rw & WRITE)
- current->flags |= PF_SYNCWRITE;
+ rw = WRITE_SYNC;
if (bdev)
bdev_blkbits = blksize_bits(bdev_hardsect_size(bdev));
}
if (dio_lock_type == DIO_LOCKING)
- down_read(&inode->i_alloc_sem);
+ /* lockdep: not the owner will release it */
+ down_read_non_owner(&inode->i_alloc_sem);
}
/*
* even for AIO, we need to wait for i/o to complete before
* returning in this case.
*/
- dio->is_async = !is_sync_kiocb(iocb) && !((rw == WRITE) &&
+ dio->is_async = !is_sync_kiocb(iocb) && !((rw & WRITE) &&
(end > i_size_read(inode)));
retval = direct_io_worker(rw, iocb, inode, iov, offset,
mutex_unlock(&inode->i_mutex);
else if (acquire_i_mutex)
mutex_lock(&inode->i_mutex);
- if (rw & WRITE)
- current->flags &= ~PF_SYNCWRITE;
return retval;
}
EXPORT_SYMBOL(__blockdev_direct_IO);