<?xml version="1.0" encoding="UTF-8"?>
<rdf:RDF xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns="http://purl.org/rss/1.0/" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:syn="http://purl.org/rss/1.0/modules/syndication/" xmlns:admin="http://webns.net/mvcb/">
  <channel rdf:about="http://blog.gmane.org/gmane.linux.kernel.aio.general">
    <title>gmane.linux.kernel.aio.general</title>
    <link>http://blog.gmane.org/gmane.linux.kernel.aio.general</link>
    <description/>
    <syn:updatePeriod>hourly</syn:updatePeriod>
    <syn:updateFrequency>1</syn:updateFrequency>
    <syn:updateBase>1901-01-01T00:00+00:00</syn:updateBase>
    <items>
      <rdf:Seq>
        <rdf:li rdf:resource="http://comments.gmane.org/gmane.linux.kernel.aio.general/3062"/>
        <rdf:li rdf:resource="http://comments.gmane.org/gmane.linux.kernel.aio.general/3059"/>
        <rdf:li rdf:resource="http://comments.gmane.org/gmane.linux.kernel.aio.general/3056"/>
        <rdf:li rdf:resource="http://comments.gmane.org/gmane.linux.kernel.aio.general/3054"/>
        <rdf:li rdf:resource="http://comments.gmane.org/gmane.linux.kernel.aio.general/3051"/>
        <rdf:li rdf:resource="http://comments.gmane.org/gmane.linux.kernel.aio.general/3042"/>
        <rdf:li rdf:resource="http://comments.gmane.org/gmane.linux.kernel.aio.general/3036"/>
        <rdf:li rdf:resource="http://comments.gmane.org/gmane.linux.kernel.aio.general/3032"/>
        <rdf:li rdf:resource="http://comments.gmane.org/gmane.linux.kernel.aio.general/3024"/>
        <rdf:li rdf:resource="http://comments.gmane.org/gmane.linux.kernel.aio.general/3022"/>
        <rdf:li rdf:resource="http://comments.gmane.org/gmane.linux.kernel.aio.general/3019"/>
        <rdf:li rdf:resource="http://comments.gmane.org/gmane.linux.kernel.aio.general/2980"/>
        <rdf:li rdf:resource="http://comments.gmane.org/gmane.linux.kernel.aio.general/2975"/>
        <rdf:li rdf:resource="http://comments.gmane.org/gmane.linux.kernel.aio.general/2967"/>
        <rdf:li rdf:resource="http://comments.gmane.org/gmane.linux.kernel.aio.general/2966"/>
        <rdf:li rdf:resource="http://comments.gmane.org/gmane.linux.kernel.aio.general/2954"/>
        <rdf:li rdf:resource="http://comments.gmane.org/gmane.linux.kernel.aio.general/2951"/>
        <rdf:li rdf:resource="http://comments.gmane.org/gmane.linux.kernel.aio.general/2949"/>
        <rdf:li rdf:resource="http://comments.gmane.org/gmane.linux.kernel.aio.general/2940"/>
        <rdf:li rdf:resource="http://comments.gmane.org/gmane.linux.kernel.aio.general/2938"/>
      </rdf:Seq>
    </items>
    <image rdf:resource="http://gmane.org/img/gmane-25t.png"/>
    <textinput rdf:resource=""/>
  </channel>
  <image rdf:about="http://gmane.org/img/gmane-25t.png">
    <title>Gmane</title>
    <url>http://gmane.org/img/gmane-25t.png</url>
    <link>http://gmane.org</link>
  </image>
  <item rdf:about="http://comments.gmane.org/gmane.linux.kernel.aio.general/3062">
    <title>libaio status</title>
    <link>http://comments.gmane.org/gmane.linux.kernel.aio.general/3062</link>
    <description>&lt;pre&gt;Hello. I'm looking for alternatives to do async I/O in Linux and it
looks like libaio is the only "native" way to do it, but the information
about it is really scarce and incomplete.

libaio website (if [1] is the website) looks outdated and have a lot of
broken links. The manpage footers says Linux 2.4 and I can't find any
mention about the restrictions mentioned in the website, so I don't know
if those restrictions are up to date and accurate.

I found this document [2], which so far looks like the most
comprehensive and up to date resource available, but again, being not an
"official" document, I'm not sure how accurate is it.

My feeling is libaio is only good to do O_DIRECT I/O on raw block
devices, but I needed for regular files in an ext4 filesystem (ideally
I shouldn't  impose a limitation on the filesystem to use, but I guess
I could do that if necessary). I need to do I/O in a server that needs
to have extremely low latency, so I can't afford any type of blocking.
Using threads is not a viable option for other reasons.

Would you say libaio is good for what I need to do? If the limits
when used with ext4 filesystem mentioned in [2] are correct, is there
any way to overcome them?

Thanks in advance!

[1] http://lse.sourceforge.net/io/aio.html
[2] http://code.google.com/p/kernel/wiki/AIOUserGuide

&lt;/pre&gt;</description>
    <dc:creator>Leandro Lucarella</dc:creator>
    <dc:date>2012-05-15T13:46:22</dc:date>
  </item>
  <item rdf:about="http://comments.gmane.org/gmane.linux.kernel.aio.general/3059">
    <title>What's the usage of io_setup?</title>
    <link>http://comments.gmane.org/gmane.linux.kernel.aio.general/3059</link>
    <description>&lt;pre&gt;Hi,

I'm new to libaio, and have some question about io_setup.

I walked through the code of io_setup, and found that it frees the ioctx
after obtain ioctx-&amp;gt;user_id. So the caller just gets a handle for a freed
ioctx, right?

So what's the usage of io_submit in real word applications?

thanks,
Ryan
&lt;/pre&gt;</description>
    <dc:creator>Ryan Wang</dc:creator>
    <dc:date>2012-04-19T01:58:58</dc:date>
  </item>
  <item rdf:about="http://comments.gmane.org/gmane.linux.kernel.aio.general/3056">
    <title>Where can I find the git tree for libaio?</title>
    <link>http://comments.gmane.org/gmane.linux.kernel.aio.general/3056</link>
    <description>&lt;pre&gt;Hi,

I'm studying libaio recently, and now I want to get the upstream libaio.
I wonder where can I find the git tree for libaio, please?

thanks,
Ryan
&lt;/pre&gt;</description>
    <dc:creator>Ryan Wang</dc:creator>
    <dc:date>2012-04-16T06:18:04</dc:date>
  </item>
  <item rdf:about="http://comments.gmane.org/gmane.linux.kernel.aio.general/3054">
    <title>[patch] aio: change a stray spin_unlock_bh() to spin_unlock()</title>
    <link>http://comments.gmane.org/gmane.linux.kernel.aio.general/3054</link>
    <description>&lt;pre&gt;We missed this spin_unlock_bh() when we removed the _bh from the other
locks in cb22bbe9f7 "aio: aio_nr_lock is taken only synchronously now"

Signed-off-by: Dan Carpenter &amp;lt;dan.carpenter&amp;lt; at &amp;gt;oracle.com&amp;gt;

diff --git a/fs/aio.c b/fs/aio.c
index 7b6b9d5..4f71627 100644
--- a/fs/aio.c
+++ b/fs/aio.c
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -280,7 +280,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static struct kioctx *ioctx_alloc(unsigned nr_events)
 spin_lock(&amp;amp;aio_nr_lock);
 if (aio_nr + nr_events &amp;gt; aio_max_nr ||
     aio_nr + nr_events &amp;lt; aio_nr) {
-spin_unlock_bh(&amp;amp;aio_nr_lock);
+spin_unlock(&amp;amp;aio_nr_lock);
 goto out_cleanup;
 }
 aio_nr += ctx-&amp;gt;max_reqs;

--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo&amp;lt; at &amp;gt;kvack.org.  For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: &amp;lt;a href=mailto:"aart&amp;lt; at &amp;gt;kvack.org"&amp;gt;aart&amp;lt; at &amp;gt;kvack.org&amp;lt;/a&amp;gt;

&lt;/pre&gt;</description>
    <dc:creator>Dan Carpenter</dc:creator>
    <dc:date>2012-03-20T13:09:19</dc:date>
  </item>
  <item rdf:about="http://comments.gmane.org/gmane.linux.kernel.aio.general/3051">
    <title>[patch] aio: wake up waiters when freeing unused kiocbs</title>
    <link>http://comments.gmane.org/gmane.linux.kernel.aio.general/3051</link>
    <description>&lt;pre&gt;Hi,

Bart Van Assche reported a hung fio process when either hot-removing
storage or when interrupting the fio process itself.  The (pruned) call
trace for the latter looks like so:

fio             D 0000000000000001     0  6849   6848 0x00000004
 ffff880092541b88 0000000000000046 ffff880000000000 ffff88012fa11dc0
 ffff88012404be70 ffff880092541fd8 ffff880092541fd8 ffff880092541fd8
 ffff880128b894d0 ffff88012404be70 ffff880092541b88 000000018106f24d
Call Trace:
 [&amp;lt;ffffffff813b683f&amp;gt;] schedule+0x3f/0x60
 [&amp;lt;ffffffff813b68ef&amp;gt;] io_schedule+0x8f/0xd0
 [&amp;lt;ffffffff81174410&amp;gt;] wait_for_all_aios+0xc0/0x100
 [&amp;lt;ffffffff81175385&amp;gt;] exit_aio+0x55/0xc0
 [&amp;lt;ffffffff810413cd&amp;gt;] mmput+0x2d/0x110
 [&amp;lt;ffffffff81047c1d&amp;gt;] exit_mm+0x10d/0x130
 [&amp;lt;ffffffff810482b1&amp;gt;] do_exit+0x671/0x860
 [&amp;lt;ffffffff81048804&amp;gt;] do_group_exit+0x44/0xb0
 [&amp;lt;ffffffff81058018&amp;gt;] get_signal_to_deliver+0x218/0x5a0
 [&amp;lt;ffffffff81002065&amp;gt;] do_signal+0x65/0x700
 [&amp;lt;ffffffff81002785&amp;gt;] do_notify_resume+0x65/0x80
 [&amp;lt;ffffffff813c0333&amp;gt;] int_signal+0x12/0x17

The problem lies with the allocation batching code.  It will
opportunistically allocate kiocbs, and then trim back the list of iocbs
when there is not enough room in the completion ring to hold all of the
events.  In the case above, what happens is that the pruning back of
events ends up freeing up the last active request and the context is
marked as dead, so it is thus responsible for waking up waiters.
Unfortunately, the code does not check for this condition, so we end up
with a hung task.

Bart reports that the below patch has fixed the problem in his testing.

Cheers,
Jeff

Signed-off-by: Jeff Moyer &amp;lt;jmoyer&amp;lt; at &amp;gt;redhat.com&amp;gt;
Reported-and-Tested-by: Bart Van Assche &amp;lt;bvanassche&amp;lt; at &amp;gt;acm.org&amp;gt;

---
Note for stable: this should be applied to 3.2.

diff --git a/fs/aio.c b/fs/aio.c
index 969beb0..67e4b90 100644
--- a/fs/aio.c
+++ b/fs/aio.c
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -490,6 +490,8 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static void kiocb_batch_free(struct kioctx *ctx, struct kiocb_batch *batch)
 kmem_cache_free(kiocb_cachep, req);
 ctx-&amp;gt;reqs_active--;
 }
+if (unlikely(!ctx-&amp;gt;reqs_active &amp;amp;&amp;amp; ctx-&amp;gt;dead))
+wake_up_all(&amp;amp;ctx-&amp;gt;wait);
 spin_unlock_irq(&amp;amp;ctx-&amp;gt;ctx_lock);
 }
 

--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo&amp;lt; at &amp;gt;kvack.org.  For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: &amp;lt;a href=mailto:"aart&amp;lt; at &amp;gt;kvack.org"&amp;gt;aart&amp;lt; at &amp;gt;kvack.org&amp;lt;/a&amp;gt;

&lt;/pre&gt;</description>
    <dc:creator>Jeff Moyer</dc:creator>
    <dc:date>2012-02-16T19:56:15</dc:date>
  </item>
  <item rdf:about="http://comments.gmane.org/gmane.linux.kernel.aio.general/3042">
    <title>[PATCH] AIO: Don't plug the I/O queue in do_io_submit()</title>
    <link>http://comments.gmane.org/gmane.linux.kernel.aio.general/3042</link>
    <description>&lt;pre&gt;Asynchronous I/O latency to a solid-state disk greatly increased
between the 2.6.32 and 3.0 kernels. By removing the plug from
do_io_submit(), we observed a 34% improvement in the I/O latency.

Unfortunately, at this level, we don't know if the request is to
a rotating disk or not.

Signed-off-by: Dave Kleikamp &amp;lt;dave.kleikamp&amp;lt; at &amp;gt;oracle.com&amp;gt;
Cc: linux-aio&amp;lt; at &amp;gt;kvack.org
Cc: Chris Mason &amp;lt;chris.mason&amp;lt; at &amp;gt;oracle.com&amp;gt;
Cc: Jens Axboe &amp;lt;axboe&amp;lt; at &amp;gt;kernel.dk&amp;gt;
Cc: Andi Kleen &amp;lt;ak&amp;lt; at &amp;gt;linux.intel.com&amp;gt;
Cc: Jeff Moyer &amp;lt;jmoyer&amp;lt; at &amp;gt;redhat.com&amp;gt;

diff --git a/fs/aio.c b/fs/aio.c
index 78c514c..d131a2c 100644
--- a/fs/aio.c
+++ b/fs/aio.c
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1696,7 +1696,6 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; long do_io_submit(aio_context_t ctx_id, long nr,
 struct kioctx *ctx;
 long ret = 0;
 int i = 0;
-struct blk_plug plug;
 struct kiocb_batch batch;
 
 if (unlikely(nr &amp;lt; 0))
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1716,8 +1715,6 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; long do_io_submit(aio_context_t ctx_id, long nr,
 
 kiocb_batch_init(&amp;amp;batch, nr);
 
-blk_start_plug(&amp;amp;plug);
-
 /*
  * AKPM: should this return a partial result if some of the IOs were
  * successfully submitted?
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1740,7 +1737,6 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; long do_io_submit(aio_context_t ctx_id, long nr,
 if (ret)
 break;
 }
-blk_finish_plug(&amp;amp;plug);
 
 kiocb_batch_free(&amp;amp;batch);
 put_ioctx(ctx);

--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo&amp;lt; at &amp;gt;kvack.org.  For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: &amp;lt;a href=mailto:"aart&amp;lt; at &amp;gt;kvack.org"&amp;gt;aart&amp;lt; at &amp;gt;kvack.org&amp;lt;/a&amp;gt;

&lt;/pre&gt;</description>
    <dc:creator>Dave Kleikamp</dc:creator>
    <dc:date>2011-12-13T21:44:45</dc:date>
  </item>
  <item rdf:about="http://comments.gmane.org/gmane.linux.kernel.aio.general/3036">
    <title>AIO kernel.org websites</title>
    <link>http://comments.gmane.org/gmane.linux.kernel.aio.general/3036</link>
    <description>&lt;pre&gt;Hi,

I'm trying to find the correct websites for the AIO project. I've
tried a few kernel.org addresses, without any luck:

  (main page) http://www.kernel.org/pub/linux/libs/aio/

  (userspace GIT) git://git.kernel.org/pub/scm/libs/libaio/libaio.git

  (userspace FTP) ftp://ftp.kernel.org/pub/linux/libs/aio/

Should these addresses be working after September's kernel.org
infrastructure changes?

Best regards,
Dan Cecile

--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo&amp;lt; at &amp;gt;kvack.org.  For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: &amp;lt;a href=mailto:"aart&amp;lt; at &amp;gt;kvack.org"&amp;gt;aart&amp;lt; at &amp;gt;kvack.org&amp;lt;/a&amp;gt;

&lt;/pre&gt;</description>
    <dc:creator>Dan Cecile</dc:creator>
    <dc:date>2011-11-26T03:05:57</dc:date>
  </item>
  <item rdf:about="http://comments.gmane.org/gmane.linux.kernel.aio.general/3032">
    <title>[PATCH][RFC] A readahead complete notify approach to implement buffer aio</title>
    <link>http://comments.gmane.org/gmane.linux.kernel.aio.general/3032</link>
    <description>&lt;pre&gt;The current libaio/aio has to be Direct-IO, otherwise it falls back into sync IO.
However, the aio core has already been asychronous naturally. This patch adds a complete
notify mechanism to implement buffer aio, the main idea is to readahead()-like in
io_submit(), counts the non-uptodated pages assocaiated with each iocb, then put each ref
in the bio complete path just before unlock_page(), and hook them on to the aio ring buffer
finally when the ref drops to zero. In io_getevents(), we call vfs_read() as a safe net
since there is still little possibility that the pages had brought in were reclaimed
between io_submit() and io_getevents().

I have tested this patch for a while, for the small size random io request, its
performance is more or less the same with the traditional aio, for the big io request,
the overhead of one extra memory copy arises.

I think so far it has at least below obvious drawbacks,

* mpage_readpage() is a really narrow interface, I have no way to pass down
the new control struct baiocb, so I just put it into struct task_struct and
refer it by current() as a workaround.

* the do_baio_read() routine is heavily similar with do_generic_file_read(), but
the latter is really hard to modify. I think we may stuff these code down into the
readahead path to reduce code reduplication.

Hopefully the explanations are clear enough and don't muddy the water any worse.
I figure the code does need some better comments, and any suggestion are welcome.

Signed-off-by: Zhu Yanhai &amp;lt;gaoyang.zyh&amp;lt; at &amp;gt;taobao.com&amp;gt;

---
 fs/aio.c                    |  319 ++++++++++++++++++++++++++++++++++++++++++-
 fs/buffer.c                 |   26 ++++-
 fs/mpage.c                  |   28 ++++-
 include/linux/aio.h         |    9 ++
 include/linux/aio_abi.h     |    1 +
 include/linux/blk_types.h   |    2 +
 include/linux/buffer_head.h |    3 +
 include/linux/page-flags.h  |    2 +
 include/linux/sched.h       |    1 +
 9 files changed, 386 insertions(+), 5 deletions(-)

diff --git a/fs/aio.c b/fs/aio.c
index e29ec48..19fc95e 100644
--- a/fs/aio.c
+++ b/fs/aio.c
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -53,6 +53,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; unsigned long aio_max_nr = 0x10000; /* system wide maximum number of aio request

 static struct kmem_cache*kiocb_cachep;
 static struct kmem_cache*kioctx_cachep;
+static struct kmem_cache*ba_iocb_cachep;

 static struct workqueue_struct *aio_wq;

&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -75,6 +76,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static int __init aio_setup(void)
kiocb_cachep = KMEM_CACHE(kiocb, SLAB_HWCACHE_ALIGN|SLAB_PANIC);
kioctx_cachep = KMEM_CACHE(kioctx,SLAB_HWCACHE_ALIGN|SLAB_PANIC);

+ba_iocb_cachep = KMEM_CACHE(ba_iocb, SLAB_HWCACHE_ALIGN|SLAB_PANIC);
aio_wq = alloc_workqueue("aio", 0, 1);/* used to limit concurrency */
BUG_ON(!aio_wq);

&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1074,19 +1076,79 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static inline void clear_timeout(struct aio_timeout *to)
del_singleshot_timer_sync(&amp;amp;to-&amp;gt;timer);
 }

+static int baio_vfs_read(unsigned int fd, char __user *buf,
+size_t count, loff_t pos)
+{
+struct file *file;
+ssize_t ret = -EBADF;
+int fput_needed;
+
+file = fget_light(fd, &amp;amp;fput_needed);
+if (file) {
+ret = vfs_read(file, buf, count, &amp;amp;pos);
+fput_light(file, fput_needed);
+}
+
+return ret;
+}
+static int baio_read_to_user(struct io_event *ent)
+{
+struct iocb __user *user_iocb;
+struct iocb tmp;
+int ret;
+
+user_iocb = (struct iocb *)(ent-&amp;gt;obj);
+if (unlikely(copy_from_user(&amp;amp;tmp, user_iocb, sizeof(tmp)))) {
+ret = -EFAULT;
+goto out;
+}
+
+ret = baio_vfs_read(tmp.aio_fildes, (char *)tmp.aio_buf,
+tmp.aio_nbytes, tmp.aio_offset);
+
+out:
+return ret;
+}
+
+/*
+ * return 1 if ent-&amp;gt;obj points to a buffer aio's iocb.
+ * 0 if it's not.
+ */
+static int check_baio(struct io_event *ent)
+{
+struct iocb __user *user_iocb;
+struct iocb tmp;
+int ret;
+user_iocb = (struct iocb *)ent-&amp;gt;obj;
+if (unlikely(copy_from_user(&amp;amp;tmp, user_iocb, sizeof(tmp)))) {
+ret = -EFAULT;
+goto out;
+}
+
+if (tmp.aio_lio_opcode == IOCB_CMD_BAIO_PREAD)
+ret = 1;
+else
+ret = 0;
+out:
+return ret;
+
+}
 static int read_events(struct kioctx *ctx,
long min_nr, long nr,
struct io_event __user *event,
struct timespec __user *timeout)
+
 {
longstart_jiffies = jiffies;
struct task_struct*tsk = current;
DECLARE_WAITQUEUE(wait, tsk);
intret;
+intret2;
inti = 0;
struct io_eventent;
struct aio_timeoutto;
intretry = 0;
+intis_baio = 0;

/* needed to zero any padding within an entry (there shouldn't be
 * any, but C is fun!
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1101,7 +1163,21 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; retry:

dprintk("read event: %Lx %Lx %Lx %Lx\n",
ent.data, ent.obj, ent.res, ent.res2);
+is_baio = check_baio(&amp;amp;ent);
+if (unlikely(is_baio &amp;lt; 0)) {
+ret = is_baio;
+break;
+}

+if (is_baio) {
+ret2 = baio_read_to_user(&amp;amp;ent);
+if (unlikely(ret2 &amp;lt; 0)) {
+ret = ret2;
+dprintk("fail in baio_read_to_user: %d\n", ret);
+break;
+}
+ent.res = ret2;
+}
/* Could we split the check in two? */
ret = -EFAULT;
if (unlikely(copy_to_user(event, &amp;amp;ent, sizeof(ent)))) {
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1167,12 +1243,27 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; retry:
/*ret = aio_read_evt(ctx, &amp;amp;ent);*/
} while (1) ;

+
set_task_state(tsk, TASK_RUNNING);
remove_wait_queue(&amp;amp;ctx-&amp;gt;wait, &amp;amp;wait);

if (unlikely(ret &amp;lt;= 0))
break;

+is_baio = check_baio(&amp;amp;ent);
+if (unlikely(is_baio &amp;lt; 0)) {
+ret = is_baio;
+break;
+}
+if (is_baio) {
+ret2 = baio_read_to_user(&amp;amp;ent);
+if (unlikely(ret2 &amp;lt; 0)) {
+ret = ret2;
+dprintk("fail in baio_read_to_user: %d\n", ret);
+break;
+}
+ent.res = ret2;
+}
ret = -EFAULT;
if (unlikely(copy_to_user(event, &amp;amp;ent, sizeof(ent)))) {
dprintk("aio: lost an event due to EFAULT.\n");
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1284,6 +1375,32 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; SYSCALL_DEFINE1(io_destroy, aio_context_t, ctx)
return -EINVAL;
 }

+
+void baio_complete(struct ba_iocb *baiocb)
+{
+ssize_t ret = 0;
+if (baiocb-&amp;gt;io_error)
+ret = baiocb-&amp;gt;io_error;
+if (ret == 0)
+ret = baiocb-&amp;gt;result;
+dprintk("baio_complete: io_error: %d, result: %d\n",
+baiocb-&amp;gt;io_error, baiocb-&amp;gt;result);
+
+aio_complete(baiocb-&amp;gt;iocb, ret, 0);
+
+}
+
+void baiocb_put(struct ba_iocb *baiocb)
+{
+BUG_ON(!baiocb);
+dprintk("baiocb_put: ref: %d\n", atomic_read(&amp;amp;baiocb-&amp;gt;ref));
+if (atomic_dec_and_test(&amp;amp;baiocb-&amp;gt;ref)) {
+baio_complete(baiocb);
+kmem_cache_free(ba_iocb_cachep, baiocb);
+}
+}
+EXPORT_SYMBOL(baiocb_put);
+
 static void aio_advance_iovec(struct kiocb *iocb, ssize_t ret)
 {
struct iovec *iov = &amp;amp;iocb-&amp;gt;ki_iovec[iocb-&amp;gt;ki_cur_seg];
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1306,7 +1423,202 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static void aio_advance_iovec(struct kiocb *iocb, ssize_t ret)
 * the remaining iovecs */
BUG_ON(ret &amp;gt; 0 &amp;amp;&amp;amp; iocb-&amp;gt;ki_left == 0);
 }
+#define list_to_page(head) (list_entry((head)-&amp;gt;prev, struct page, lru))
+
+
+
+static void init_baiocb(struct ba_iocb *baiocb, struct kiocb *iocb)
+{
+atomic_set(&amp;amp;baiocb-&amp;gt;ref, 1);
+baiocb-&amp;gt;iocb = iocb;
+baiocb-&amp;gt;io_error = 0;
+baiocb-&amp;gt;result = 0;
+
+}
+static inline void baiocb_get(struct ba_iocb *baiocb)
+{
+BUG_ON(!baiocb);
+atomic_add(1, &amp;amp;baiocb-&amp;gt;ref);
+pr_debug("baiocb_add: ref: %d\n", atomic_read(&amp;amp;baiocb-&amp;gt;ref));
+}
+
+
+/*
+ * Return value is in desc-&amp;gt;error, return the submitted bytes
+ * to read on success,
+ * In fact the exact value doesn't matter because it will be
+ * ignored in upper level aio_run_iocb() in the async path,
+ * and our code won't be envolved in the sync path
+ * anyway.
+ */
+void do_baio_read(struct file *file, struct kiocb *iocb, loff_t *ppos,
+read_descriptor_t *desc)
+{
+loff_t first_page_read_size;
+size_t count = desc-&amp;gt;count;
+struct ba_iocb *baiocb;
+
+unsigned long nr_pages_to_read, page_idx;
+ssize_t ret = 0;
+struct address_space *mapping;
+struct inode *inode;
+pgoff_t start, end, end_index;
+loff_t isize;
+LIST_HEAD(page_pool);
+struct page *page;
+
+
+start = *ppos &amp;gt;&amp;gt; PAGE_CACHE_SHIFT;
+end = (*ppos + count - 1) &amp;gt;&amp;gt; PAGE_CACHE_SHIFT;
+nr_pages_to_read = end - start + 1;
+desc-&amp;gt;error = 0;
+
+first_page_read_size = PAGE_CACHE_SIZE - (*ppos &amp;amp; ~PAGE_CACHE_MASK);
+
+mapping = file-&amp;gt;f_mapping;
+if (unlikely(!mapping-&amp;gt;a_ops-&amp;gt;readpage)) {
+desc-&amp;gt;error = -EINVAL;
+return;
+}
+
+baiocb = kmem_cache_alloc(ba_iocb_cachep, GFP_KERNEL);
+if (unlikely(!baiocb)) {
+desc-&amp;gt;error = -ENOMEM;
+return;
+}
+ /* allocate ba_iocb with one ref. */
+init_baiocb(baiocb, iocb);
+current-&amp;gt;current_baiocb = baiocb;
+
+inode = mapping-&amp;gt;host;
+isize = i_size_read(inode);
+end_index = ((isize - 1) &amp;gt;&amp;gt; PAGE_CACHE_SHIFT);

+for (page_idx = 0; page_idx &amp;lt; nr_pages_to_read; page_idx++) {
+pgoff_t page_offset = start + page_idx;
+unsigned long nr;
+
+if (page_offset &amp;gt; end_index)
+break;
+
+nr = PAGE_CACHE_SIZE;
+if (page_idx == 0)
+nr = first_page_read_size;
+if (count &amp;lt; nr)
+nr = count;
+count -= nr;
+find_page:
+page = find_get_page(mapping, page_offset);
+
+pr_debug("To read %d bytes\n", nr);
+if (page) {
+ret = lock_page_killable(page);
+if (unlikely(ret)) {
+page_cache_release(page);
+desc-&amp;gt;error = ret;
+goto out;
+}
+if(PageUptodate(page)) {
+/* This won't go for IO. */
+pr_debug("To baiocb_put as page is uptodated.\n");
+unlock_page(page);
+page_cache_release(page);
+/* Avoid to be reclaimed. This is not good.
+ * Todo: get_page, then make some page pool, release
+ * them after all bios are finished.
+ */
+/* mark_page_accessed(page); */
+desc-&amp;gt;written += nr;
+continue;
+}
+if (PageError(page))
+ClearPageError(page);
+} else {
+page = page_cache_alloc_cold(mapping);
+if (!page) {
+desc-&amp;gt;error = -ENOMEM;
+goto out;
+}
+
+ret = add_to_page_cache_lru(page, mapping,
+page_offset, GFP_KERNEL);
+if (ret) {
+page_cache_release(page);
+if (ret == -EEXIST) {
+pr_debug("to baiocb_put as it's there\n");
+ret = 0;
+} else {
+pr_debug("error in add_to_page_cache_lru\n");
+desc-&amp;gt;error = ret;
+goto out;
+}
+}
+}
+/* We hold an extra ref to the page after above, also the page
+ * has been locked
+ */
+BUG_ON(!page);
+BUG_ON(!PageLocked(page));
+SetPageBaio(page);
+pr_debug("To readpage() %d\n", page_idx);
+baiocb_get(baiocb);
+ret = mapping-&amp;gt;a_ops-&amp;gt;readpage(file, page);
+if (unlikely(ret)) {
+baiocb_put(baiocb);
+if (ret == AOP_TRUNCATED_PAGE) {
+/* The AOP method that was handed a locked page
+ * has unlocked it. We just release the refcount
+ */
+ClearPageBaio(page);
+page_cache_release(page);
+goto find_page;
+}
+desc-&amp;gt;error = ret;
+goto out;
+}
+page_cache_release(page);
+}
+out:
+pr_debug("To the finial baiocb_put()\n");
+baiocb_put(baiocb);
+current-&amp;gt;current_baiocb = NULL;
+return;
+
+}
+
+/*
+ * return -EIOCBQUEUED on success. The exact number of bytes are
+ * ignored by the upper level caller. At least we don't have to
+ * make it very precise at ths moment.
+ */
+ssize_t
+baio_read(struct kiocb *iocb, const struct iovec *iov,
+unsigned long nr_segs, loff_t pos)
+{
+int seg = 0;
+ssize_t written = 0;
+loff_t *ppos;
+
+BUG_ON(!iocb);
+ppos = &amp;amp;iocb-&amp;gt;ki_pos;
+for (seg = 0; seg &amp;lt; nr_segs; seg++) {
+read_descriptor_t desc;
+desc.written = 0;
+desc.arg.buf = iov[seg].iov_base;
+desc.count = iov[seg].iov_len;
+if (desc.count == 0)
+continue;
+desc.error = 0;
+do_baio_read(iocb-&amp;gt;ki_filp, iocb, ppos, &amp;amp;desc);
+written += desc.written;
+
+if (desc.error) {
+written = written ? : desc.error;
+break;
+}
+}
+return (written &amp;lt; 0) ? written : -EIOCBQUEUED;
+}
 static ssize_t aio_rw_vect_retry(struct kiocb *iocb)
 {
struct file *file = iocb-&amp;gt;ki_filp;
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1321,6 +1633,9 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static ssize_t aio_rw_vect_retry(struct kiocb *iocb)
(iocb-&amp;gt;ki_opcode == IOCB_CMD_PREAD)) {
rw_op = file-&amp;gt;f_op-&amp;gt;aio_read;
opcode = IOCB_CMD_PREADV;
+} else if (iocb-&amp;gt;ki_opcode == IOCB_CMD_BAIO_PREAD) {
+rw_op = baio_read;
+opcode = IOCB_CMD_BAIO_PREAD;
} else {
rw_op = file-&amp;gt;f_op-&amp;gt;aio_write;
opcode = IOCB_CMD_PWRITEV;
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1429,6 +1744,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static ssize_t aio_setup_iocb(struct kiocb *kiocb, bool compat)
ssize_t ret = 0;

switch (kiocb-&amp;gt;ki_opcode) {
+case IOCB_CMD_BAIO_PREAD:
case IOCB_CMD_PREAD:
ret = -EBADF;
if (unlikely(!(file-&amp;gt;f_mode &amp;amp; FMODE_READ)))
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1794,6 +2110,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; SYSCALL_DEFINE5(io_getevents, aio_context_t, ctx_id,
put_ioctx(ioctx);
}

-asmlinkage_protect(5, ret, ctx_id, min_nr, nr, events, timeout);
+asmlinkage_protect(5, ret, ctx_id, min_nr, nr,
+events, timeout);
return ret;
 }
diff --git a/fs/buffer.c b/fs/buffer.c
index 1a80b04..26d2bfe 100644
--- a/fs/buffer.c
+++ b/fs/buffer.c
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -52,6 +52,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; init_buffer(struct buffer_head *bh, bh_end_io_t *handler, void *private)
 {
bh-&amp;gt;b_end_io = handler;
bh-&amp;gt;b_private = private;
+bh-&amp;gt;b_private2 = NULL;
 }
 EXPORT_SYMBOL(init_buffer);

&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -309,7 +310,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static void end_buffer_async_read(struct buffer_head *bh, int uptodate)
struct buffer_head *tmp;
struct page *page;
int page_uptodate = 1;
-
+struct ba_iocb *baiocb;
BUG_ON(!buffer_async_read(bh));

page = bh-&amp;gt;b_page;
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -351,6 +352,18 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static void end_buffer_async_read(struct buffer_head *bh, int uptodate)
 */
if (page_uptodate &amp;amp;&amp;amp; !PageError(page))
SetPageUptodate(page);
+
+baiocb = (struct ba_iocb *)bh-&amp;gt;b_private2;
+BUG_ON(baiocb &amp;amp;&amp;amp; !PageBaio(page));
+BUG_ON(!baiocb &amp;amp;&amp;amp; PageBaio(page));
+
+if (baiocb &amp;amp;&amp;amp; PageBaio(page)) {
+ClearPageBaio(page);
+if (!page_uptodate || PageError(page))
+baiocb-&amp;gt;io_error = -EIO;
+baiocb-&amp;gt;result += PAGE_SIZE;
+baiocb_put(baiocb);
+}
unlock_page(page);
return;

&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -2159,6 +2172,8 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; int block_read_full_page(struct page *page, get_block_t *get_block)
 */
if (!PageError(page))
SetPageUptodate(page);
+if (PageBaio(page))
+baiocb_put(current-&amp;gt;current_baiocb);
unlock_page(page);
return 0;
}
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -2902,7 +2917,11 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static void end_bio_bh_io_sync(struct bio *bio, int err)
if (unlikely (test_bit(BIO_QUIET,&amp;amp;bio-&amp;gt;bi_flags)))
set_bit(BH_Quiet, &amp;amp;bh-&amp;gt;b_state);

+if (bio_flagged(bio, BIO_BAIO))
+bh-&amp;gt;b_private2 = (void *)bio-&amp;gt;bi_private2;
+
bh-&amp;gt;b_end_io(bh, test_bit(BIO_UPTODATE, &amp;amp;bio-&amp;gt;bi_flags));
+clear_bit(BIO_BAIO, &amp;amp;bio-&amp;gt;bi_flags);
bio_put(bio);
 }

&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -2942,6 +2961,11 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; int submit_bh(int rw, struct buffer_head * bh)
bio-&amp;gt;bi_end_io = end_bio_bh_io_sync;
bio-&amp;gt;bi_private = bh;

+if (PageBaio(bh-&amp;gt;b_page)) {
+set_bit(BIO_BAIO, &amp;amp;bio-&amp;gt;bi_flags);
+bio-&amp;gt;bi_private2 = (void *)current-&amp;gt;current_baiocb;
+}
+
bio_get(bio);
submit_bio(rw, bio);

diff --git a/fs/mpage.c b/fs/mpage.c
index fdfae9f..6bcfbed 100644
--- a/fs/mpage.c
+++ b/fs/mpage.c
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -58,6 +58,16 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static void mpage_end_io(struct bio *bio, int err)
ClearPageUptodate(page);
SetPageError(page);
}
+if (bio_flagged(bio, BIO_BAIO) &amp;amp;&amp;amp; PageBaio(page)) {
+struct ba_iocb *baiocb =
+(struct ba_iocb *)bio-&amp;gt;bi_private2;
+clear_bit(BIO_BAIO, &amp;amp;bio-&amp;gt;bi_flags);
+ClearPageBaio(page);
+if (!uptodate)
+baiocb-&amp;gt;io_error = -EIO;
+baiocb-&amp;gt;result += bvec-&amp;gt;bv_len;
+baiocb_put(baiocb);
+}
unlock_page(page);
} else { /* bio_data_dir(bio) == WRITE */
if (!uptodate) {
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -167,11 +177,12 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; do_mpage_readpage(struct bio *bio, struct page *page, unsigned nr_pages,
unsigned page_block;
unsigned first_hole = blocks_per_page;
struct block_device *bdev = NULL;
-int length;
+int length, bio_length;
int fully_mapped = 1;
unsigned nblocks;
unsigned relative_block;

+
if (page_has_buffers(page))
goto confused;

&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -265,6 +276,8 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; do_mpage_readpage(struct bio *bio, struct page *page, unsigned nr_pages,
zero_user_segment(page, first_hole &amp;lt;&amp;lt; blkbits, PAGE_CACHE_SIZE);
if (first_hole == 0) {
SetPageUptodate(page);
+if (PageBaio(page))
+baiocb_put(current-&amp;gt;current_baiocb);
unlock_page(page);
goto out;
}
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -294,7 +307,13 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; alloc_new:
}

length = first_hole &amp;lt;&amp;lt; blkbits;
-if (bio_add_page(bio, page, length, 0) &amp;lt; length) {
+bio_length = bio_add_page(bio, page, length, 0);
+if (PageBaio(page)) {
+bio-&amp;gt;bi_private2 = (void *)current-&amp;gt;current_baiocb;
+set_bit(BIO_BAIO, &amp;amp;bio-&amp;gt;bi_flags);
+}
+
+if (bio_length &amp;lt; length) {
bio = mpage_bio_submit(READ, bio);
goto alloc_new;
}
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -314,8 +333,11 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; confused:
bio = mpage_bio_submit(READ, bio);
if (!PageUptodate(page))
        block_read_full_page(page, get_block);
-else
+else {
+if (PageBaio(page))
+baiocb_put(current-&amp;gt;current_baiocb);
unlock_page(page);
+    }
goto out;
 }

diff --git a/include/linux/aio.h b/include/linux/aio.h
index 2dcb72b..36ce4f2 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -202,6 +202,13 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; struct kioctx {
struct rcu_headrcu_head;
 };

+struct ba_iocb {
+atomic_tref;
+struct kiocb*iocb;
+intio_error;
+ssize_tresult;
+};
+
 /* prototypes */
 extern unsigned aio_max_size;

&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -214,6 +221,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; struct mm_struct;
 extern void exit_aio(struct mm_struct *mm);
 extern long do_io_submit(aio_context_t ctx_id, long nr,
 struct iocb __user *__user *iocbpp, bool compat);
+extern void baiocb_put(struct ba_iocb *baiocb);
 #else
 static inline ssize_t wait_on_sync_kiocb(struct kiocb *iocb) { return 0; }
 static inline int aio_put_req(struct kiocb *iocb) { return 0; }
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -224,6 +232,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static inline void exit_aio(struct mm_struct *mm) { }
 static inline long do_io_submit(aio_context_t ctx_id, long nr,
struct iocb __user * __user *iocbpp,
bool compat) { return 0; }
+static void baiocb_put(struct ba_iocb *baiocb) { }
 #endif /* CONFIG_AIO */

 static inline struct kiocb *list_kiocb(struct list_head *h)
diff --git a/include/linux/aio_abi.h b/include/linux/aio_abi.h
index 2c87316..78c0bed 100644
--- a/include/linux/aio_abi.h
+++ b/include/linux/aio_abi.h
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -44,6 +44,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; enum {
IOCB_CMD_NOOP = 6,
IOCB_CMD_PREADV = 7,
IOCB_CMD_PWRITEV = 8,
+IOCB_CMD_BAIO_PREAD = 9,
 };

 /*
diff --git a/include/linux/blk_types.h b/include/linux/blk_types.h
index 71fc53b..aba7dd1 100644
--- a/include/linux/blk_types.h
+++ b/include/linux/blk_types.h
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -68,6 +68,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; struct bio {
bio_end_io_t*bi_end_io;

void*bi_private;
+void*bi_private2;
 #if defined(CONFIG_BLK_DEV_INTEGRITY)
struct bio_integrity_payload *bi_integrity;  /* data integrity */
 #endif
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -98,6 +99,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; struct bio {
 #define BIO_FS_INTEGRITY 10/* fs owns integrity data, not block layer */
 #define BIO_QUIET11/* Make BIO Quiet */
 #define BIO_MAPPED_INTEGRITY 12/* integrity metadata has been remapped */
+#define BIO_BAIO13/* a buffered aio request */
 #define bio_flagged(bio, flag)((bio)-&amp;gt;bi_flags &amp;amp; (1 &amp;lt;&amp;lt; (flag)))

 /*
diff --git a/include/linux/buffer_head.h b/include/linux/buffer_head.h
index 458f497..4ce40db 100644
--- a/include/linux/buffer_head.h
+++ b/include/linux/buffer_head.h
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -38,6 +38,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; enum bh_state_bits {
BH_PrivateStart,/* not a state bit, but the first bit available
 * for private allocation by other entities
 */
+    BH_Baio,
 };

 #define MAX_BUF_PER_PAGE (PAGE_CACHE_SIZE / 512)
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -72,6 +73,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; struct buffer_head {
struct address_space *b_assoc_map;/* mapping this buffer is
   associated with */
atomic_t b_count;/* users using this buffer_head */
+void *b_private2;
 };

 /*
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -124,6 +126,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; BUFFER_FNS(Delay, delay)
 BUFFER_FNS(Boundary, boundary)
 BUFFER_FNS(Write_EIO, write_io_error)
 BUFFER_FNS(Unwritten, unwritten)
+BUFFER_FNS(Baio, baio)

 #define bh_offset(bh)((unsigned long)(bh)-&amp;gt;b_data &amp;amp; ~PAGE_MASK)
 #define touch_buffer(bh)mark_page_accessed(bh-&amp;gt;b_page)
diff --git a/include/linux/page-flags.h b/include/linux/page-flags.h
index e90a673..fad65bc 100644
--- a/include/linux/page-flags.h
+++ b/include/linux/page-flags.h
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -107,6 +107,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; enum pageflags {
 #ifdef CONFIG_TRANSPARENT_HUGEPAGE
PG_compound_lock,
 #endif
+PG_baio,
__NR_PAGEFLAGS,

/* Filesystems */
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -208,6 +209,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; PAGEFLAG(Reserved, reserved) __CLEARPAGEFLAG(Reserved, reserved)
 PAGEFLAG(SwapBacked, swapbacked) __CLEARPAGEFLAG(SwapBacked, swapbacked)

 __PAGEFLAG(SlobFree, slob_free)
+PAGEFLAG(Baio, baio)

 /*
  * Private page markings that may be used by the filesystem that owns the page
diff --git a/include/linux/sched.h b/include/linux/sched.h
index e8acce7..aa42509 100644
--- a/include/linux/sched.h
+++ b/include/linux/sched.h
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1566,6 +1566,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; struct task_struct {
 #ifdef CONFIG_HAVE_HW_BREAKPOINT
atomic_t ptrace_bp_refcnt;
 #endif
+struct ba_iocb *current_baiocb;
 };

 /* Future-safe accessor for struct task_struct's cpus_allowed. */
--
1.7.4.4

--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo&amp;lt; at &amp;gt;kvack.org.  For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: &amp;lt;a href=mailto:"aart&amp;lt; at &amp;gt;kvack.org"&amp;gt;aart&amp;lt; at &amp;gt;kvack.org&amp;lt;/a&amp;gt;

&lt;/pre&gt;</description>
    <dc:creator>Zhu Yanhai</dc:creator>
    <dc:date>2011-11-01T09:00:24</dc:date>
  </item>
  <item rdf:about="http://comments.gmane.org/gmane.linux.kernel.aio.general/3024">
    <title>blocking io_submit</title>
    <link>http://comments.gmane.org/gmane.linux.kernel.aio.general/3024</link>
    <description>&lt;pre&gt;I've just realized the magnitude of the blocking
io_submit().

In my test [1], run on ubuntu 11.04, the majority of
the jobs submitted in io_submit are complete by the
time it returns. i.e. the level of asynchrony is fairly
low (for being an async. API).

Graphing the output [2] shows that the test spends essentially
all its time blocked by io_submit (as opposed to waiting
for completion events).

My attempt as a (user-level) work-around is to spawn a few
threads who spend their time submitting jobs. At least
my main disk thread won't get blocked, and I can react to
completion events much sooner. It also lets me submit jobs
sooner, by submitting from more than one thread.

Are there any caveats with this approach?

Does anyone have a better or different idea of ways to
work around this problem?

thanks,
&lt;/pre&gt;</description>
    <dc:creator>arvid&lt; at &gt;cs.umu.se</dc:creator>
    <dc:date>2011-09-23T02:25:33</dc:date>
  </item>
  <item rdf:about="http://comments.gmane.org/gmane.linux.kernel.aio.general/3022">
    <title>[patch, v3] aio: allocate kiocbs in batches</title>
    <link>http://comments.gmane.org/gmane.linux.kernel.aio.general/3022</link>
    <description>&lt;pre&gt;Hi,

In testing aio on a fast storage device, I found that the context lock
takes up a fair amount of cpu time in the I/O submission path.  The
reason is that we take it for every I/O submitted (see __aio_get_req).
Since we know how many I/Os are passed to io_submit, we can preallocate
the kiocbs in batches, reducing the number of times we take and release
the lock.  In my testing, I was able to reduce the amount of time spent
in _raw_spin_lock_irq by .56% (average of 3 runs).  The command I used
to test this was:
   aio-stress -O -o 2 -o 3 -r 8 -d 128 -b 32 -i 32 -s 16384 &amp;lt;dev&amp;gt;

I also tested the patch with various numbers of events passed to
io_submit, and I ran the xfstests aio group of tests to ensure I didn't
break anything.

Signed-off-by: Jeff Moyer &amp;lt;jmoyer&amp;lt; at &amp;gt;redhat.com&amp;gt;

---
Changes from v2 -&amp;gt; v3:
- got rid of an extraneous structure member in the kiocb_batch.
- fixed up some haphazard variable types
- fixed a build warning

Changes from rfc -&amp;gt; v2:
- folded in akpm's incremental patch which fixes coding style and
  variable names
- tried to clarify a comment about a starvation case
- fixed up my breaking of the handling of that starvation case
- moved from an on-stack array to a list at the suggestion of akpm

diff --git a/fs/aio.c b/fs/aio.c
index e29ec48..f2e65a8 100644
--- a/fs/aio.c
+++ b/fs/aio.c
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -440,8 +440,6 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; void exit_aio(struct mm_struct *mm)
 static struct kiocb *__aio_get_req(struct kioctx *ctx)
 {
 struct kiocb *req = NULL;
-struct aio_ring *ring;
-int okay = 0;
 
 req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL);
 if (unlikely(!req))
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -459,39 +457,114 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static struct kiocb *__aio_get_req(struct kioctx *ctx)
 INIT_LIST_HEAD(&amp;amp;req-&amp;gt;ki_run_list);
 req-&amp;gt;ki_eventfd = NULL;
 
-/* Check if the completion queue has enough free space to
- * accept an event from this io.
- */
+return req;
+}
+
+/*
+ * struct kiocb's are allocated in batches to reduce the number of
+ * times the ctx lock is acquired and released.
+ */
+#define KIOCB_BATCH_SIZE32L
+struct kiocb_batch {
+struct list_head head;
+long count; /* number of requests left to allocate */
+};
+
+static void kiocb_batch_init(struct kiocb_batch *batch, long total)
+{
+INIT_LIST_HEAD(&amp;amp;batch-&amp;gt;head);
+batch-&amp;gt;count = total;
+}
+
+static void kiocb_batch_free(struct kiocb_batch *batch)
+{
+struct kiocb *req, *n;
+
+list_for_each_entry_safe(req, n, &amp;amp;batch-&amp;gt;head, ki_batch) {
+list_del(&amp;amp;req-&amp;gt;ki_batch);
+kmem_cache_free(kiocb_cachep, req);
+}
+}
+
+/*
+ * Allocate a batch of kiocbs.  This avoids taking and dropping the
+ * context lock a lot during setup.
+ */
+static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
+{
+unsigned short allocated, to_alloc;
+long avail;
+bool called_fput = false;
+struct kiocb *req, *n;
+struct aio_ring *ring;
+
+to_alloc = min(batch-&amp;gt;count, KIOCB_BATCH_SIZE);
+for (allocated = 0; allocated &amp;lt; to_alloc; allocated++) {
+req = __aio_get_req(ctx);
+if (!req)
+/* allocation failed, go with what we've got */
+break;
+list_add(&amp;amp;req-&amp;gt;ki_batch, &amp;amp;batch-&amp;gt;head);
+}
+
+if (allocated == 0)
+goto out;
+
+retry:
 spin_lock_irq(&amp;amp;ctx-&amp;gt;ctx_lock);
-ring = kmap_atomic(ctx-&amp;gt;ring_info.ring_pages[0], KM_USER0);
-if (ctx-&amp;gt;reqs_active &amp;lt; aio_ring_avail(&amp;amp;ctx-&amp;gt;ring_info, ring)) {
+ring = kmap_atomic(ctx-&amp;gt;ring_info.ring_pages[0]);
+
+avail = aio_ring_avail(&amp;amp;ctx-&amp;gt;ring_info, ring) - ctx-&amp;gt;reqs_active;
+BUG_ON(avail &amp;lt; 0);
+if (avail == 0 &amp;amp;&amp;amp; !called_fput) {
+/*
+ * Handle a potential starvation case.  It is possible that
+ * we hold the last reference on a struct file, causing us
+ * to delay the final fput to non-irq context.  In this case,
+ * ctx-&amp;gt;reqs_active is artificially high.  Calling the fput
+ * routine here may free up a slot in the event completion
+ * ring, allowing this allocation to succeed.
+ */
+kunmap_atomic(ring);
+spin_unlock_irq(&amp;amp;ctx-&amp;gt;ctx_lock);
+aio_fput_routine(NULL);
+called_fput = true;
+goto retry;
+}
+
+if (avail &amp;lt; allocated) {
+/* Trim back the number of requests. */
+list_for_each_entry_safe(req, n, &amp;amp;batch-&amp;gt;head, ki_batch) {
+list_del(&amp;amp;req-&amp;gt;ki_batch);
+kmem_cache_free(kiocb_cachep, req);
+if (--allocated &amp;lt;= avail)
+break;
+}
+}
+
+batch-&amp;gt;count -= allocated;
+list_for_each_entry(req, &amp;amp;batch-&amp;gt;head, ki_batch) {
 list_add(&amp;amp;req-&amp;gt;ki_list, &amp;amp;ctx-&amp;gt;active_reqs);
 ctx-&amp;gt;reqs_active++;
-okay = 1;
 }
-kunmap_atomic(ring, KM_USER0);
-spin_unlock_irq(&amp;amp;ctx-&amp;gt;ctx_lock);
 
-if (!okay) {
-kmem_cache_free(kiocb_cachep, req);
-req = NULL;
-}
+kunmap_atomic(ring);
+spin_unlock_irq(&amp;amp;ctx-&amp;gt;ctx_lock);
 
-return req;
+out:
+return allocated;
 }
 
-static inline struct kiocb *aio_get_req(struct kioctx *ctx)
+static inline struct kiocb *aio_get_req(struct kioctx *ctx,
+struct kiocb_batch *batch)
 {
 struct kiocb *req;
-/* Handle a potential starvation case -- should be exceedingly rare as 
- * requests will be stuck on fput_head only if the aio_fput_routine is 
- * delayed and the requests were the last user of the struct file.
- */
-req = __aio_get_req(ctx);
-if (unlikely(NULL == req)) {
-aio_fput_routine(NULL);
-req = __aio_get_req(ctx);
-}
+
+if (list_empty(&amp;amp;batch-&amp;gt;head))
+if (kiocb_batch_refill(ctx, batch) == 0)
+return NULL;
+req = list_first_entry(&amp;amp;batch-&amp;gt;head, struct kiocb, ki_batch);
+list_del(&amp;amp;req-&amp;gt;ki_batch);
 return req;
 }
 
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1515,7 +1588,8 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static ssize_t aio_setup_iocb(struct kiocb *kiocb, bool compat)
 }
 
 static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
- struct iocb *iocb, bool compat)
+ struct iocb *iocb, struct kiocb_batch *batch,
+ bool compat)
 {
 struct kiocb *req;
 struct file *file;
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1541,7 +1615,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 if (unlikely(!file))
 return -EBADF;
 
-req = aio_get_req(ctx);/* returns with 2 references to req */
+req = aio_get_req(ctx, batch);  /* returns with 2 references to req */
 if (unlikely(!req)) {
 fput(file);
 return -EAGAIN;
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1621,8 +1695,9 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; long do_io_submit(aio_context_t ctx_id, long nr,
 {
 struct kioctx *ctx;
 long ret = 0;
-int i;
+int i = 0;
 struct blk_plug plug;
+struct kiocb_batch batch;
 
 if (unlikely(nr &amp;lt; 0))
 return -EINVAL;
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1639,6 +1714,8 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; long do_io_submit(aio_context_t ctx_id, long nr,
 return -EINVAL;
 }
 
+kiocb_batch_init(&amp;amp;batch, nr);
+
 blk_start_plug(&amp;amp;plug);
 
 /*
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1659,12 +1736,13 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; long do_io_submit(aio_context_t ctx_id, long nr,
 break;
 }
 
-ret = io_submit_one(ctx, user_iocb, &amp;amp;tmp, compat);
+ret = io_submit_one(ctx, user_iocb, &amp;amp;tmp, &amp;amp;batch, compat);
 if (ret)
 break;
 }
 blk_finish_plug(&amp;amp;plug);
 
+kiocb_batch_free(&amp;amp;batch);
 put_ioctx(ctx);
 return i ? i : ret;
 }
diff --git a/include/linux/aio.h b/include/linux/aio.h
index 2dcb72b..2314ad8 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -117,6 +117,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; struct kiocb {
 
 struct list_headki_list;/* the aio core uses this
  * for cancellation */
+struct list_headki_batch;/* batch allocation */
 
 /*
  * If the aio_resfd field of the userspace iocb is not zero,

--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo&amp;lt; at &amp;gt;kvack.org.  For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: &amp;lt;a href=mailto:"aart&amp;lt; at &amp;gt;kvack.org"&amp;gt;aart&amp;lt; at &amp;gt;kvack.org&amp;lt;/a&amp;gt;

&lt;/pre&gt;</description>
    <dc:creator>Jeff Moyer</dc:creator>
    <dc:date>2011-09-22T16:41:51</dc:date>
  </item>
  <item rdf:about="http://comments.gmane.org/gmane.linux.kernel.aio.general/3019">
    <title>[patch, v2] aio: allocate kiocbs in batches</title>
    <link>http://comments.gmane.org/gmane.linux.kernel.aio.general/3019</link>
    <description>&lt;pre&gt;Hi,

In testing aio on a fast storage device, I found that the context lock
takes up a fair amount of cpu time in the I/O submission path.  The
reason is that we take it for every I/O submitted (see __aio_get_req).
Since we know how many I/Os are passed to io_submit, we can preallocate
the kiocbs in batches, reducing the number of times we take and release
the lock.  In my testing, I was able to reduce the amount of time spent
in _raw_spin_lock_irq by .56% (average of 3 runs).  The command I used
to test this was:
   aio-stress -O -o 2 -o 3 -r 8 -d 128 -b 32 -i 32 -s 16384 &amp;lt;dev&amp;gt;

I also tested the patch with various numbers of events passed to
io_submit, and I ran the xfstests aio group of tests to ensure I didn't
break anything.

Signed-off-by: Jeff Moyer &amp;lt;jmoyer&amp;lt; at &amp;gt;redhat.com&amp;gt;

---
Changes from rfc -&amp;gt; v2:
- folded in akpm's incremental patch which fixes coding style and
  variable names
- tried to clarify a comment about a starvation case
- fixed up my breaking of the handling of that starvation case
- moved from an on-stack array to a list at the suggestion of akpm


diff --git a/fs/aio.c b/fs/aio.c
index e29ec48..8229329 100644
--- a/fs/aio.c
+++ b/fs/aio.c
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -440,8 +440,6 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; void exit_aio(struct mm_struct *mm)
 static struct kiocb *__aio_get_req(struct kioctx *ctx)
 {
 struct kiocb *req = NULL;
-struct aio_ring *ring;
-int okay = 0;
 
 req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL);
 if (unlikely(!req))
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -459,39 +457,116 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static struct kiocb *__aio_get_req(struct kioctx *ctx)
 INIT_LIST_HEAD(&amp;amp;req-&amp;gt;ki_run_list);
 req-&amp;gt;ki_eventfd = NULL;
 
-/* Check if the completion queue has enough free space to
- * accept an event from this io.
- */
+return req;
+}
+
+/*
+ * struct kiocb's are allocated in batches to reduce the number of
+ * times the ctx lock is acquired and released.
+ */
+#define KIOCB_BATCH_SIZE32
+struct kiocb_batch {
+struct list_head head;
+long total;/* number of requests passed to sys_io_submit */
+long allocated;/* number of requests allocated so far */
+};
+
+static void kiocb_batch_init(struct kiocb_batch *batch, long total)
+{
+INIT_LIST_HEAD(&amp;amp;batch-&amp;gt;head);
+batch-&amp;gt;total = total;
+batch-&amp;gt;allocated = 0;
+}
+
+static void kiocb_batch_free(struct kiocb_batch *batch)
+{
+struct kiocb *req, *n;
+
+list_for_each_entry_safe(req, n, &amp;amp;batch-&amp;gt;head, ki_batch) {
+list_del(&amp;amp;req-&amp;gt;ki_batch);
+kmem_cache_free(kiocb_cachep, req);
+}
+}
+
+/*
+ * Allocate a batch of kiocbs.  This avoids taking and dropping the
+ * context lock a lot during setup.
+ */
+static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
+{
+int i;
+int to_alloc, avail;
+bool called_fput = false;
+struct kiocb *req, *n;
+struct aio_ring *ring;
+
+to_alloc = min(batch-&amp;gt;total - batch-&amp;gt;allocated, KIOCB_BATCH_SIZE);
+for (i = 0; i &amp;lt; to_alloc; i++) {
+req = __aio_get_req(ctx);
+if (!req)
+/* allocation failed, go with what we've got */
+break;
+list_add(&amp;amp;req-&amp;gt;ki_batch, &amp;amp;batch-&amp;gt;head);
+}
+
+if (i == 0)
+goto out;
+
+retry:
 spin_lock_irq(&amp;amp;ctx-&amp;gt;ctx_lock);
-ring = kmap_atomic(ctx-&amp;gt;ring_info.ring_pages[0], KM_USER0);
-if (ctx-&amp;gt;reqs_active &amp;lt; aio_ring_avail(&amp;amp;ctx-&amp;gt;ring_info, ring)) {
+ring = kmap_atomic(ctx-&amp;gt;ring_info.ring_pages[0]);
+
+avail = aio_ring_avail(&amp;amp;ctx-&amp;gt;ring_info, ring) - ctx-&amp;gt;reqs_active;
+BUG_ON(avail &amp;lt; 0);
+if (avail == 0 &amp;amp;&amp;amp; !called_fput) {
+/*
+ * Handle a potential starvation case.  It is possible that
+ * we hold the last reference on a struct file, causing us
+ * to delay the final fput to non-irq context.  In this case,
+ * ctx-&amp;gt;reqs_active is artificially high.  Calling the fput
+ * routine here may free up a slot in the event completion
+ * ring, allowing this allocation to succeed.
+ */
+spin_unlock_irq(&amp;amp;ctx-&amp;gt;ctx_lock);
+kunmap_atomic(ring);
+aio_fput_routine(NULL);
+called_fput = true;
+goto retry;
+}
+
+if (avail &amp;lt; i) {
+/* Trim back the number of requests. */
+list_for_each_entry_safe(req, n, &amp;amp;batch-&amp;gt;head, ki_batch) {
+list_del(&amp;amp;req-&amp;gt;ki_batch);
+kmem_cache_free(kiocb_cachep, req);
+if (--i &amp;lt;= avail)
+break;
+}
+}
+
+batch-&amp;gt;allocated += i;
+list_for_each_entry(req, &amp;amp;batch-&amp;gt;head, ki_batch) {
 list_add(&amp;amp;req-&amp;gt;ki_list, &amp;amp;ctx-&amp;gt;active_reqs);
 ctx-&amp;gt;reqs_active++;
-okay = 1;
 }
-kunmap_atomic(ring, KM_USER0);
-spin_unlock_irq(&amp;amp;ctx-&amp;gt;ctx_lock);
 
-if (!okay) {
-kmem_cache_free(kiocb_cachep, req);
-req = NULL;
-}
+kunmap_atomic(ring);
+spin_unlock_irq(&amp;amp;ctx-&amp;gt;ctx_lock);
 
-return req;
+out:
+return i;
 }
 
-static inline struct kiocb *aio_get_req(struct kioctx *ctx)
+static inline struct kiocb *aio_get_req(struct kioctx *ctx,
+struct kiocb_batch *batch)
 {
 struct kiocb *req;
-/* Handle a potential starvation case -- should be exceedingly rare as 
- * requests will be stuck on fput_head only if the aio_fput_routine is 
- * delayed and the requests were the last user of the struct file.
- */
-req = __aio_get_req(ctx);
-if (unlikely(NULL == req)) {
-aio_fput_routine(NULL);
-req = __aio_get_req(ctx);
-}
+
+if (list_empty(&amp;amp;batch-&amp;gt;head))
+if (kiocb_batch_refill(ctx, batch) == 0)
+return NULL;
+req = list_first_entry(&amp;amp;batch-&amp;gt;head, struct kiocb, ki_batch);
+list_del(&amp;amp;req-&amp;gt;ki_batch);
 return req;
 }
 
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1515,7 +1590,8 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static ssize_t aio_setup_iocb(struct kiocb *kiocb, bool compat)
 }
 
 static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
- struct iocb *iocb, bool compat)
+ struct iocb *iocb, struct kiocb_batch *batch,
+ bool compat)
 {
 struct kiocb *req;
 struct file *file;
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1541,7 +1617,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 if (unlikely(!file))
 return -EBADF;
 
-req = aio_get_req(ctx);/* returns with 2 references to req */
+req = aio_get_req(ctx, batch);  /* returns with 2 references to req */
 if (unlikely(!req)) {
 fput(file);
 return -EAGAIN;
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1621,8 +1697,9 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; long do_io_submit(aio_context_t ctx_id, long nr,
 {
 struct kioctx *ctx;
 long ret = 0;
-int i;
+int i = 0;
 struct blk_plug plug;
+struct kiocb_batch batch;
 
 if (unlikely(nr &amp;lt; 0))
 return -EINVAL;
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1639,6 +1716,8 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; long do_io_submit(aio_context_t ctx_id, long nr,
 return -EINVAL;
 }
 
+kiocb_batch_init(&amp;amp;batch, nr);
+
 blk_start_plug(&amp;amp;plug);
 
 /*
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1659,12 +1738,13 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; long do_io_submit(aio_context_t ctx_id, long nr,
 break;
 }
 
-ret = io_submit_one(ctx, user_iocb, &amp;amp;tmp, compat);
+ret = io_submit_one(ctx, user_iocb, &amp;amp;tmp, &amp;amp;batch, compat);
 if (ret)
 break;
 }
 blk_finish_plug(&amp;amp;plug);
 
+kiocb_batch_free(&amp;amp;batch);
 put_ioctx(ctx);
 return i ? i : ret;
 }
diff --git a/include/linux/aio.h b/include/linux/aio.h
index 2dcb72b..2314ad8 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -117,6 +117,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; struct kiocb {
 
 struct list_headki_list;/* the aio core uses this
  * for cancellation */
+struct list_headki_batch;/* batch allocation */
 
 /*
  * If the aio_resfd field of the userspace iocb is not zero,

--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo&amp;lt; at &amp;gt;kvack.org.  For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: &amp;lt;a href=mailto:"aart&amp;lt; at &amp;gt;kvack.org"&amp;gt;aart&amp;lt; at &amp;gt;kvack.org&amp;lt;/a&amp;gt;

&lt;/pre&gt;</description>
    <dc:creator>Jeff Moyer</dc:creator>
    <dc:date>2011-09-21T17:16:00</dc:date>
  </item>
  <item rdf:about="http://comments.gmane.org/gmane.linux.kernel.aio.general/2980">
    <title>io_getevents() segfaults</title>
    <link>http://comments.gmane.org/gmane.linux.kernel.aio.general/2980</link>
    <description>&lt;pre&gt;Hi!
I've been fixing libaio tests in LTP and found that io_getevents() may
segfault on random ctx.

The manual however says that we should get EINVAL in this case.

The cause for this is code in io_getevents() that dereferences ctx to
check for empty queue.

I've been told by Jeff that this code was flawed and not really
implemented anyway so attached patch simply removes it.

&lt;/pre&gt;</description>
    <dc:creator>Cyril Hrubis</dc:creator>
    <dc:date>2011-03-23T17:10:23</dc:date>
  </item>
  <item rdf:about="http://comments.gmane.org/gmane.linux.kernel.aio.general/2975">
    <title>[PATCH] aio: Wake all waiters when destroying ctx</title>
    <link>http://comments.gmane.org/gmane.linux.kernel.aio.general/2975</link>
    <description>&lt;pre&gt;From: Roland Dreier &amp;lt;roland&amp;lt; at &amp;gt;purestorage.com&amp;gt;

The test program below will hang because io_getevents() uses
add_wait_queue_exclusive(), which means the wake_up() in io_destroy()
only wakes up one of the threads.  Fix this by using wake_up_all() in
the aio code paths where we want to make sure no one gets stuck.

// t.c -- compile with gcc -lpthread -laio t.c

#include &amp;lt;libaio.h&amp;gt;
#include &amp;lt;pthread.h&amp;gt;
#include &amp;lt;stdio.h&amp;gt;
#include &amp;lt;unistd.h&amp;gt;

static const int nthr = 2;

void *getev(void *ctx)
{
struct io_event ev;
io_getevents(ctx, 1, 1, &amp;amp;ev, NULL);
printf("io_getevents returned\n");
return NULL;
}

int main(int argc, char *argv[])
{
io_context_t ctx = 0;
pthread_t thread[nthr];
int i;

io_setup(1024, &amp;amp;ctx);

for (i = 0; i &amp;lt; nthr; ++i)
pthread_create(&amp;amp;thread[i], NULL, getev, ctx);

sleep(1);

io_destroy(ctx);

for (i = 0; i &amp;lt; nthr; ++i)
pthread_join(thread[i], NULL);

return 0;
}

Cc: &amp;lt;stable&amp;lt; at &amp;gt;kernel.org&amp;gt;
Signed-off-by: Roland Dreier &amp;lt;roland&amp;lt; at &amp;gt;purestorage.com&amp;gt;
---
 fs/aio.c |    4 ++--
 1 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/fs/aio.c b/fs/aio.c
index 26869cd..88f0ed5 100644
--- a/fs/aio.c
+++ b/fs/aio.c
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -520,7 +520,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
 ctx-&amp;gt;reqs_active--;
 
 if (unlikely(!ctx-&amp;gt;reqs_active &amp;amp;&amp;amp; ctx-&amp;gt;dead))
-wake_up(&amp;amp;ctx-&amp;gt;wait);
+wake_up_all(&amp;amp;ctx-&amp;gt;wait);
 }
 
 static void aio_fput_routine(struct work_struct *data)
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1229,7 +1229,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static void io_destroy(struct kioctx *ioctx)
  * by other CPUs at this point.  Right now, we rely on the
  * locking done by the above calls to ensure this consistency.
  */
-wake_up(&amp;amp;ioctx-&amp;gt;wait);
+wake_up_all(&amp;amp;ioctx-&amp;gt;wait);
 put_ioctx(ioctx);/* once for the lookup */
 }
 

--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo&amp;lt; at &amp;gt;kvack.org.  For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: &amp;lt;a href=mailto:"aart&amp;lt; at &amp;gt;kvack.org"&amp;gt;aart&amp;lt; at &amp;gt;kvack.org&amp;lt;/a&amp;gt;

&lt;/pre&gt;</description>
    <dc:creator>Roland Dreier</dc:creator>
    <dc:date>2011-03-11T05:55:37</dc:date>
  </item>
  <item rdf:about="http://comments.gmane.org/gmane.linux.kernel.aio.general/2967">
    <title>The program.</title>
    <link>http://comments.gmane.org/gmane.linux.kernel.aio.general/2967</link>
    <description>&lt;pre&gt;Forgot to attach the program.
Please this program could be much better, the checking of errors like
ENOMEM could be better. I know.
But it does what it's written for. Do aio writes to an ordinary file
at a non aligned offset.

Stef
&lt;/pre&gt;</description>
    <dc:creator>Stef Bon</dc:creator>
    <dc:date>2011-01-31T09:47:26</dc:date>
  </item>
  <item rdf:about="http://comments.gmane.org/gmane.linux.kernel.aio.general/2966">
    <title>Some questions.</title>
    <link>http://comments.gmane.org/gmane.linux.kernel.aio.general/2966</link>
    <description>&lt;pre&gt;Hello,

as I've already mentioned here, I'm working on a fuse fs, and want to
make it work with aio.

I've some questions:

a. The aiocp example is a very good example. It's much better than the
aiocp in the man file of io, which totallly does not mention the
alignment.
My advice is to replace that. I've been trying to make aio work for
weeks now, and it's very frustating to not find the right
documentation. Just some days ago I first got the idea how important
the alignment is, when doing aio. The manpage should mention that.

About alignment, Jeff Moyer told me that when reading or writing from
a sata block device, the value alignment is the (logical) blocksize,
which you can detect with BLKSSZGET.
Isn't it the same value when doing a stavfs? My fs can do this once
for every underlying fs/blockdevice and can get this value once and
can store this value for later use, in stead of doing this every time
when doing an aio.

b. When looking at the aiocp example, at the start an array of iocb is
initialised :

io_prep_pread(iocb_free[i], -1, buf, iosize, 0)

now this is very new (and undocumented). What does this?

c. Again the aiocb program from above mentiones that the start and the
end leftovers are written using other flags, not O_DIRECT but O_SYNC.
The size of the buffer is also not aligned, per definition, cause
we're dealing here with non block aligned data.
But the io operations for the start and the end leftovers still work
here. Is it still aio then?


You can say: when doing direct io -&amp;gt; buffer has to be aligned



d.what about aio and network shares, for example smb shares mounted with cifs?

I haven't been testing it.

e. the attached program uses now aio read and write in combination
with epoll, signalfd and eventfd.
It's using aio read and writes, but what's i important is the aio
write. This does determine the start leftover, the aligned middle and
the end leftover when doing a write.

This works, this doesn't have to be done when reading. The offset
(when doing aio read) can just start anywhere, and the buffer to be
read can end anywhere, but it can be done in one read (at least when
it fits in one buffer). I can understand that a litlle bit why, but
can you confirm this?

f. the way writes work in the example program I've written a write
will result in three aio operations (again one for the start leftover,
one for the aligned middle and one the end). It's also possible to do
the middle using aio, and when that has completed, doing the start and
the end using normal io. Has anyone some experiences/examples with
that, or is this a not so good idea?

Stef

--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo&amp;lt; at &amp;gt;kvack.org.  For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: &amp;lt;a href=mailto:"aart&amp;lt; at &amp;gt;kvack.org"&amp;gt;aart&amp;lt; at &amp;gt;kvack.org&amp;lt;/a&amp;gt;

&lt;/pre&gt;</description>
    <dc:creator>Stef Bon</dc:creator>
    <dc:date>2011-01-31T09:34:42</dc:date>
  </item>
  <item rdf:about="http://comments.gmane.org/gmane.linux.kernel.aio.general/2954">
    <title>Problem using compile option -D_FILE_OFFSET_BITS=64.</title>
    <link>http://comments.gmane.org/gmane.linux.kernel.aio.general/2954</link>
    <description>&lt;pre&gt;Hi,

I'm trying to make my fuse fs use aio.

I've sent an email about this earlier to this list.

Now I've written a small test program to try to test the aio in combination with
signals and epoll.

This program works very good, at least when it's compiled like:


gcc -Wall -lrt testsignal.c -o testsignal


It's a advanced version of the test program found in the manpage of signalfd.

now test it:

./testsignal
mainloop: adding signalfd 4 to epoll
^CGot SIGINT: start the test aio read.


the progress in logfile:

Jan 28 11:58:08 clfs20091030 testsignal: aio_read_file
Jan 28 11:58:08 clfs20091030 testsignal: aio_read return: 0
Jan 28 11:58:08 clfs20091030 testsignal: mainloop aio read, no error,
nbytes: 14196767
Jan 28 11:58:08 clfs20091030 testsignal: aio_write_file
Jan 28 11:58:08 clfs20091030 testsignal: aio_write return: 0
Jan 28 11:58:08 clfs20091030 testsignal: mainloop aio write, no error,
nbytes: 14196767


This is as expected.

Now when I compile it using the -D_FILE_OFFSET_BITS=64

the same logoutput when Ctrl-C is pressed (and the aio read is strarted:)

Jan 28 11:50:27 clfs20091030 testsignal: aio_read_file
Jan 28 11:50:27 clfs20091030 testsignal: aio_read return: 0
Jan 28 11:50:27 clfs20091030 testsignal: unknown aio opcode 128


Someone an idea??

Thanks in advance. This is maybe the problem with making aio work with fuse.
Fuse must be compiled using the -D_FILE_... flag.

Stef
&lt;/pre&gt;</description>
    <dc:creator>Stef Bon</dc:creator>
    <dc:date>2011-01-28T11:57:04</dc:date>
  </item>
  <item rdf:about="http://comments.gmane.org/gmane.linux.kernel.aio.general/2951">
    <title>problem using aio in fuse fs.</title>
    <link>http://comments.gmane.org/gmane.linux.kernel.aio.general/2951</link>
    <description>&lt;pre&gt;Hi,

I'm writing a fuse fs, fuse-workspace, which helps to create a
userfriendly, effective and powerfull layer for the average user. See:


http://linux.bononline.nl/wiki/index.php/Changes_and_issues

for changes and recent screenshots.

Mainpage:

http://linux.bononline.nl/wiki/index.php/Mount.md5key


Well the fuse-fs is the cosmetic layer, especially when in "GoboLinux" mode.
Blocking behaviour is very unwanted, cause this will block every app
running in userspace, wanting to do some io.
And then I mean every io.

Read and writes at this moment are blocking.


Now FUSE offers a construction to handle aio very easy.
By using a single thead, and in the mainloop a epoll instance,
watching standard the fuse fd for incoming fs events, like the call
getattr, open and read.
Now the processing of this request is done in a fuse fs specific call.
When ready it will send the results back using fuse_reply, using the
request handle to indentify it.


Now using aio here is very possible by:

a. add an eventfd to the list of fd's the epoll listens to when the
program starts (=fs is mounted) and the io_context initialized using
io_setup


if ( global_mainloop_io_data-&amp;gt;use_aio&amp;gt;0) {

    memset(&amp;amp;fuse_io_context, 0, sizeof(fuse_io_context));
    res=io_setup(128, &amp;amp;fuse_io_context);

    if ( res == 0 ) {

syslog(LOG_DEBUG, "Created io context...(eventfd: %i)",
global_mainloop_io_data-&amp;gt;io_eventfd);

global_mainloop_io_data-&amp;gt;fuse_io_context=fuse_io_context;

    } else {

syslog(LOG_DEBUG, "Failed to create io context...(eventfd: %i)",
global_mainloop_io_data-&amp;gt;io_eventfd);

global_mainloop_io_data-&amp;gt;use_aio=0;

    }

}


and

// set io data

if ( global_mainloop_io_data ) {

    if ( global_mainloop_io_data-&amp;gt;use_eventfd&amp;gt;0 ) {

fprintf(stdout, "using eventfd\n");

global_mainloop_io_data-&amp;gt;io_eventfd=eventfd(0, 0);

if ( global_mainloop_io_data-&amp;gt;io_eventfd==-1 ) {

    global_mainloop_io_data-&amp;gt;io_eventfd=0;
    fprintf(stderr, "error %i init eventfd", errno);

}

    }

}



b. when a read request is sent the fs, the fs specific call has to
initialize a aio read like:

read:

static int aio_read_localhost(fuse_req_t req, size_t size, off_t
offset, struct fuse_file_info *fi)
{
int nreturn=0;
uid_t uid_keep;
gid_t gid_keep;
mode_t umask_keep;
const struct fuse_ctx *ctx=fuse_req_ctx(req);
struct iocb *riocb;
struct iocb *priocb[1];
void *tmpbuf;
size_t aiosize;

if ( xmpfs_options.logging &amp;gt; 0 ) syslog(LOG_DEBUG, "aio_read_localhost");


// create an array of one element.....

riocb = malloc( sizeof ( struct iocb));

if ( riocb==NULL ) {

    nreturn=-ENOMEM;
    goto out;

}

// create a memaligned buffer for aio

aiosize=((size-1)/pagesize + 1)*pagesize;
if ( xmpfs_options.logging &amp;gt; 0 ) syslog(LOG_DEBUG, "using aiosize:
%lu", aiosize);

nreturn=posix_memalign(&amp;amp;tmpbuf, pagesize, aiosize);

if ( nreturn!=0 ) {

    free(riocb);
    nreturn=abs(nreturn);
    goto out;

}

memset(tmpbuf, 0, aiosize);

io_prep_pread(riocb, fi-&amp;gt;fh, tmpbuf, size, offset);

priocb[0]=riocb;

riocb-&amp;gt;data=req;

// let the aio send to eventfd when completed
if ( xmpfs_options.logging &amp;gt; 0 ) syslog(LOG_DEBUG, "set eventfd: %i",
global_mainloop_io_data-&amp;gt;io_eventfd);

io_set_eventfd(riocb, global_mainloop_io_data-&amp;gt;io_eventfd);

uid_keep=setfsuid(ctx-&amp;gt;uid);
gid_keep=setfsgid(ctx-&amp;gt;gid);
umask_keep=umask(ctx-&amp;gt;umask);

nreturn=io_submit(global_mainloop_io_data-&amp;gt;fuse_io_context, 1, priocb);

if ( xmpfs_options.logging &amp;gt; 0 ) syslog(LOG_DEBUG, "submit return:
%i", nreturn);

// io_submit returns nr jobs if successfull

if ( nreturn&amp;gt;0 ) nreturn=0;

umask(umask_keep);
setfsuid(uid_keep);
setfsgid(gid_keep);

out:

if ( xmpfs_options.logging &amp;gt; 0 ) syslog(LOG_DEBUG, "read_localhost
return: %i", nreturn);

return nreturn;

}

and write:

static int aio_write_localhost(fuse_req_t req, const char *buf, size_t
size, off_t off, struct fuse_file_info *fi)
{
    int nreturn=0;
    uid_t uid_keep;
    gid_t gid_keep;
    const struct fuse_ctx *ctx=fuse_req_ctx(req);
    struct iocb *wiocb;
    struct iocb *pwiocb[1];
    void *tmpbuf;
    size_t aiosize;

    if ( xmpfs_options.logging &amp;gt; 0 ) syslog(LOG_DEBUG, "aio_write_localhost");


    // create an array of one element.....

    wiocb = malloc(sizeof(struct iocb));

    if ( wiocb==NULL ) {

nreturn=-ENOMEM;
goto out;

    }

    // create a memaligned buffer for aio

    aiosize=((size-1)/pagesize + 1)*pagesize;

    if ( xmpfs_options.logging &amp;gt; 0 ) syslog(LOG_DEBUG, "using aiosize:
%lu", aiosize);

    nreturn=posix_memalign(&amp;amp;tmpbuf, pagesize, aiosize);

    if ( nreturn!=0 ) {

free(wiocb);
nreturn=abs(nreturn);
goto out;

    }

    memset(tmpbuf, 0, aiosize);

    // for a write copy the to be written buffer to tmpbuf

    memcpy(tmpbuf, buf, size);

    io_prep_pwrite(wiocb, fi-&amp;gt;fh, tmpbuf, size, off);

    pwiocb[0]=wiocb;

    wiocb-&amp;gt;data=req;

    // make the aio command use the eventfd when completed

    if ( xmpfs_options.logging &amp;gt; 0 ) syslog(LOG_DEBUG, "set eventfd:
%i", global_mainloop_io_data-&amp;gt;io_eventfd);

    io_set_eventfd(wiocb, global_mainloop_io_data-&amp;gt;io_eventfd);

    uid_keep=setfsuid(ctx-&amp;gt;uid);
    gid_keep=setfsgid(ctx-&amp;gt;gid);

    nreturn=io_submit(global_mainloop_io_data-&amp;gt;fuse_io_context, 1, pwiocb);

    if ( xmpfs_options.logging &amp;gt; 0 ) syslog(LOG_DEBUG, "submit return:
%i", nreturn);

    // io_submit returns nr jobs if successfull

    if ( nreturn&amp;gt;0 ) nreturn=0;

    setfsuid(uid_keep);
    setfsgid(gid_keep);

    out:

    if ( xmpfs_options.logging &amp;gt; 0 ) syslog(LOG_DEBUG,
"aio_write_localhost, return: %i", nreturn);

    return nreturn;

}

As you can see these calls will start the aio operation, and
continues, not wating for the request to finish, what is a good thing!


But then the mainloop:


#include &amp;lt;time.h&amp;gt;

int fuse_session_loop(struct fuse_session *se)
{
int res = 0, fuse_fd, epoll_fd, ev_fd=0;
struct epoll_event epoll_events[MAXPOLLSIZE];
int i,j;
uint64_t event_nr;
ssize_t event_len;
long nr_io_events;
struct fuse_chan *ch = fuse_session_next_chan(se, NULL);
size_t bufsize = fuse_chan_bufsize(ch);
char *buf = (char *) malloc(bufsize);
struct io_event *io_events;
struct fuse_req *req;
struct mainloop_io_data_struct *mainloop_io_data;
struct timespec *io_timeout;


if (!buf) {
fprintf(stderr, "fuse: failed to allocate read buffer\n");
return -1;
}

// create an epoll instance

epoll_fd=epoll_create(MAXPOLLSIZE);

// determine the fd of the fuse instance/channel

fuse_fd=fuse_chan_fd(ch);

// add this fd to the epoll instance

static struct epoll_event ev_fuse;
ev_fuse.events=EPOLLIN | EPOLLOUT;
ev_fuse.data.fd=fuse_fd;
res=epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fuse_fd, &amp;amp;ev_fuse);

if ( res==-1 ) {

    res=-errno;
    goto exit;

}

mainloop_io_data=(struct mainloop_io_data_struct *) fuse_session_userdata(se);

if ( mainloop_io_data ) {

ev_fd=mainloop_io_data-&amp;gt;io_eventfd;

}

if ( ev_fd&amp;gt;0 ) {

    // there is an eventfd..
    // add to the list of fd's for epoll to listen to

    syslog(LOG_DEBUG, "mainloop: adding eventfd %i to epoll", ev_fd);

    static struct epoll_event ev_aio_eventfd;
    ev_aio_eventfd.events=EPOLLIN;
    ev_aio_eventfd.data.fd=ev_fd;

    res=epoll_ctl(epoll_fd, EPOLL_CTL_ADD, ev_fd, &amp;amp;ev_aio_eventfd);

    if ( res==-1 ) {

res=-errno;
goto exit;

    }

    io_timeout=malloc(sizeof(struct timespec));

    memset(io_timeout, 0, sizeof(struct timespec));
    io_timeout-&amp;gt;tv_nsec=0;
    io_timeout-&amp;gt;tv_sec=0;

    io_events=(struct io_event *) malloc(MAX_NUM_EVENTS *
sizeof(struct io_event));

} else {

    syslog(LOG_DEBUG, "mainloop: no eventfd, error %i", errno);

}




while (1) {


    int number_of_fds=epoll_wait(epoll_fd, epoll_events, MAXPOLLSIZE, -1);

    if (number_of_fds &amp;lt; 0) {
fprintf(stderr, "fuse: epoll_wait error");
goto exit;
    }

    for (i=0; i&amp;lt;number_of_fds; i++) {

int fd=epoll_events[i].data.fd;

if ( fd == fuse_fd ) {

    // it's an fuse thing

    // first handle eventual exit

    if (fuse_session_exited(se) != 0) {

goto exit;

    }

    struct fuse_chan *tmpch = ch;

    res = fuse_chan_recv(&amp;amp;tmpch, buf, bufsize);

    if (res&amp;gt;0) {

fuse_session_process(se, buf, res, tmpch);

    } else if ( fuse_session_exited(se) != 0 ) {

goto exit;

    }

} else if ( ev_fd&amp;gt;0 &amp;amp;&amp;amp; fd == ev_fd ) {

    // some data on eventfd

    event_len=read(ev_fd, &amp;amp;event_nr, sizeof(uint64_t));

    syslog(LOG_DEBUG, "mainloop: in event loop");

    if ( event_len == sizeof(uint64_t) ) {

// get the pointer to the aio data
// do not wait, just get the nr of events ready

syslog(LOG_DEBUG, "mainloop: event_nr: %li", event_nr);

if ( event_nr &amp;gt; MAX_NUM_EVENTS ) event_nr=MAX_NUM_EVENTS;

// io_events=(struct io_event *) malloc(event_nr * sizeof(struct io_event));

memset(io_events, 0, MAX_NUM_EVENTS * sizeof(struct io_event));

io_timeout-&amp;gt;tv_nsec=10;
io_timeout-&amp;gt;tv_sec=0;


nr_io_events=io_getevents(mainloop_io_data-&amp;gt;fuse_io_context, 1,
event_nr, io_events, NULL);


syslog(LOG_DEBUG, "mainloop: : nr_io_events %li",nr_io_events);


for (j=0; j&amp;lt;nr_io_events; j++) {


    if ( ! io_events[j].obj || ! io_events[j].obj-&amp;gt;u.c.buf ) {

syslog(LOG_DEBUG, "error, not everythin set..");

continue;

    }

    req=(struct fuse_req *) io_events[j].data;

    if ( ! req ) {

syslog(LOG_DEBUG, "error, request not set....");
continue;

    }

    if ( io_events[j].obj-&amp;gt;aio_lio_opcode==IO_CMD_PREAD ) {

// aio read

if ( io_events[j].res2==0 ) {

    // no error

    syslog(LOG_DEBUG, "aio read, bytes read %li", io_events[j].res);

    fuse_reply_buf(req, io_events[j].obj-&amp;gt;u.c.buf, io_events[j].res);

} else {

    // error

    syslog(LOG_DEBUG, "aio read, error %li", io_events[j].res2==0);

    fuse_reply_err(req, io_events[j].res2);

}


    } else if ( io_events[j].obj-&amp;gt;aio_lio_opcode==IO_CMD_PWRITE ) {


// aio write

if ( io_events[j].res2==0 ) {

    // no error

    syslog(LOG_DEBUG, "aio write, bytes read %li", io_events[j].res);

    fuse_reply_write(req, io_events[j].res);

} else {

    // error

    syslog(LOG_DEBUG, "aio write, error %li", io_events[j].res2);

    fuse_reply_err(req, io_events[j].res2);

}

    } else {

syslog(LOG_DEBUG, "unknown aio opcode %i",
io_events[j].obj-&amp;gt;aio_lio_opcode);

    }

    // free the buffer

    if (io_events[j].obj-&amp;gt;u.c.buf) free(io_events[j].obj-&amp;gt;u.c.buf);

    free(io_events[j].obj);

}

    }

}

    }

}

exit:

free(buf);
fuse_session_reset(se);

return res &amp;lt; 0 ? -1 : 0;
}


After mounting the fs, which is in this case a simple overlay fs:

fusexmp-fh-ll-aio ~/mount -o logging=2,io_aio=yes

this fs mount the / under the mountpoint (here: ~/mount) like a bind mount.

Now everything works untill I do a read:
on the commandline:

cat etc/ulogd.conf


this command hangs, waiting for the request back.
The logfile says:

Jan 26 12:32:20 clfs20091030 fusexmp fh ll aio: open
Jan 26 12:32:20 clfs20091030 fusexmp fh ll aio: adding O_NONBLOCK to open flags
Jan 26 12:32:20 clfs20091030 fusexmp fh ll aio: find_inode
Jan 26 12:32:20 clfs20091030 fusexmp fh ll aio: inode 21 found
Jan 26 12:32:20 clfs20091030 fusexmp fh ll aio: determine_fuse_path.
Jan 26 12:32:20 clfs20091030 fusexmp fh ll aio: name: ulogd.conf
Jan 26 12:32:20 clfs20091030 fusexmp fh ll aio: determine_fuse_path:
Jan 26 12:32:20 clfs20091030 fusexmp fh ll aio: path: etc/ulogd.conf
Jan 26 12:32:20 clfs20091030 fusexmp fh ll aio: determine_localhost_path
Jan 26 12:32:20 clfs20091030 fusexmp fh ll aio: determine_localhost_path
Jan 26 12:32:20 clfs20091030 fusexmp fh ll aio: pathinfo-&amp;gt;abspath:
/etc/ulogd.conf
Jan 26 12:32:20 clfs20091030 fusexmp fh ll aio: pathinfo-&amp;gt;relpath:
/etc/ulogd.conf
Jan 26 12:32:20 clfs20091030 fusexmp fh ll aio: fd: 0
Jan 26 12:32:20 clfs20091030 fusexmp fh ll aio: open_localhost
Jan 26 12:32:20 clfs20091030 fusexmp fh ll aio: open_localhost,
opening via abs path
Jan 26 12:32:20 clfs20091030 fusexmp fh ll aio: open_localhost: fd: 7
Jan 26 12:32:20 clfs20091030 fusexmp fh ll aio: open nreturn: 7
Jan 26 12:32:20 clfs20091030 fusexmp fh ll aio: read
Jan 26 12:32:20 clfs20091030 fusexmp fh ll aio: aio_read_localhost
Jan 26 12:32:20 clfs20091030 fusexmp fh ll aio: using aiosize: 8192
Jan 26 12:32:20 clfs20091030 fusexmp fh ll aio: set eventfd: 5
Jan 26 12:32:20 clfs20091030 fusexmp fh ll aio: submit return: 1
Jan 26 12:32:20 clfs20091030 fusexmp fh ll aio: read_localhost return: 0
Jan 26 12:32:20 clfs20091030 fusexmp fh ll aio: aio , no error: 0
Jan 26 12:32:22 clfs20091030 fusexmp fh ll aio: getattr
Jan 26 12:32:22 clfs20091030 fusexmp fh ll aio: find_inode
Jan 26 12:32:22 clfs20091030 fusexmp fh ll aio: inode 1 found
Jan 26 12:32:22 clfs20091030 fusexmp fh ll aio: determine_fuse_path.
Jan 26 12:32:22 clfs20091030 fusexmp fh ll aio: name: .
Jan 26 12:32:22 clfs20091030 fusexmp fh ll aio: determine_fuse_path:
Jan 26 12:32:22 clfs20091030 fusexmp fh ll aio: path: .
Jan 26 12:32:22 clfs20091030 fusexmp fh ll aio: getattr, entry, name: .
Jan 26 12:32:22 clfs20091030 fusexmp fh ll aio: determine_localhost_path
Jan 26 12:32:22 clfs20091030 fusexmp fh ll aio: determine_localhost_path
Jan 26 12:32:22 clfs20091030 fusexmp fh ll aio: pathinfo-&amp;gt;abspath: /.
Jan 26 12:32:22 clfs20091030 fusexmp fh ll aio: pathinfo-&amp;gt;relpath: /.
Jan 26 12:32:22 clfs20091030 fusexmp fh ll aio: fd: 0
Jan 26 12:32:22 clfs20091030 fusexmp fh ll aio: getattr_localhost
Jan 26 12:32:22 clfs20091030 fusexmp fh ll aio: getattr, return: 0
Jan 26 12:32:22 clfs20091030 fusexmp fh ll aio: mainloop: in event loop
Jan 26 12:32:22 clfs20091030 fusexmp fh ll aio: mainloop: event_nr: 1
Jan 26 12:32:22 clfs20091030 fusexmp fh ll aio: mainloop: : nr_io_events -14

This explains the blocking of the read call, the handling of the
signal that it's completed by the eventfd is not right!
Well, it's good to see that the eventfd is notified, and epoll works,
but then io_getevents gives the error
14, which is EFAULT.

When reading the manpage of io_getevents, this means that or the
timeout or the events is an invalid pointer.
I do not understand. Both are declared as written in the manpage:

struct io_event *io_events

and

struct timespec *io_timeout


Someone an idea??

Stef Bon
Voorburg
the Netherlands

--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo&amp;lt; at &amp;gt;kvack.org.  For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: &amp;lt;a href=mailto:"aart&amp;lt; at &amp;gt;kvack.org"&amp;gt;aart&amp;lt; at &amp;gt;kvack.org&amp;lt;/a&amp;gt;

&lt;/pre&gt;</description>
    <dc:creator>Stef Bon</dc:creator>
    <dc:date>2011-01-26T12:02:12</dc:date>
  </item>
  <item rdf:about="http://comments.gmane.org/gmane.linux.kernel.aio.general/2949">
    <title>[patch] aio: remove unused function, aio_run_iocbs</title>
    <link>http://comments.gmane.org/gmane.linux.kernel.aio.general/2949</link>
    <description>&lt;pre&gt;Hi,

aio_run_iocbs is not used at all, so get rid of it.

Signed-off-by: Jeff Moyer &amp;lt;jmoyer&amp;lt; at &amp;gt;redhat.com&amp;gt;

diff --git a/fs/aio.c b/fs/aio.c
index 8c8f6c5..6ca2f96 100644
--- a/fs/aio.c
+++ b/fs/aio.c
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -798,30 +798,14 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static void aio_queue_work(struct kioctx * ctx)
 queue_delayed_work(aio_wq, &amp;amp;ctx-&amp;gt;wq, timeout);
 }
 
-
 /*
- * aio_run_iocbs:
+ * aio_run_all_iocbs:
  * Process all pending retries queued on the ioctx
- * run list.
+ * run list, and keep running them until the list
+ * stays empty.
  * Assumes it is operating within the aio issuer's mm
  * context.
  */
-static inline void aio_run_iocbs(struct kioctx *ctx)
-{
-int requeue;
-
-spin_lock_irq(&amp;amp;ctx-&amp;gt;ctx_lock);
-
-requeue = __aio_run_iocbs(ctx);
-spin_unlock_irq(&amp;amp;ctx-&amp;gt;ctx_lock);
-if (requeue)
-aio_queue_work(ctx);
-}
-
-/*
- * just like aio_run_iocbs, but keeps running them until
- * the list stays empty
- */
 static inline void aio_run_all_iocbs(struct kioctx *ctx)
 {
 spin_lock_irq(&amp;amp;ctx-&amp;gt;ctx_lock);

--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo&amp;lt; at &amp;gt;kvack.org.  For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: &amp;lt;a href=mailto:"aart&amp;lt; at &amp;gt;kvack.org"&amp;gt;aart&amp;lt; at &amp;gt;kvack.org&amp;lt;/a&amp;gt;

&lt;/pre&gt;</description>
    <dc:creator>Jeff Moyer</dc:creator>
    <dc:date>2011-01-05T21:19:50</dc:date>
  </item>
  <item rdf:about="http://comments.gmane.org/gmane.linux.kernel.aio.general/2940">
    <title>[PATCH] aio: remove unnecessary check</title>
    <link>http://comments.gmane.org/gmane.linux.kernel.aio.general/2940</link>
    <description>&lt;pre&gt;'nr &amp;gt;= min_nr &amp;gt;= 0' always satisfies 'nr &amp;gt;= 0' so the check is unnecesary.

Signed-off-by: Namhyung Kim &amp;lt;namhyung&amp;lt; at &amp;gt;gmail.com&amp;gt;
---
 fs/aio.c |    2 +-
 1 files changed, 1 insertions(+), 1 deletions(-)

diff --git a/fs/aio.c b/fs/aio.c
index c56153f5f73e..45766460fa57 100644
--- a/fs/aio.c
+++ b/fs/aio.c
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1839,7 +1839,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; SYSCALL_DEFINE5(io_getevents, aio_context_t, ctx_id,
 long ret = -EINVAL;
 
 if (likely(ioctx)) {
-if (likely(min_nr &amp;lt;= nr &amp;amp;&amp;amp; min_nr &amp;gt;= 0 &amp;amp;&amp;amp; nr &amp;gt;= 0))
+if (likely(min_nr &amp;lt;= nr &amp;amp;&amp;amp; min_nr &amp;gt;= 0))
 ret = read_events(ioctx, min_nr, nr, events, timeout);
 put_ioctx(ioctx);
 }
&lt;/pre&gt;</description>
    <dc:creator>Namhyung Kim</dc:creator>
    <dc:date>2010-12-16T09:09:05</dc:date>
  </item>
  <item rdf:about="http://comments.gmane.org/gmane.linux.kernel.aio.general/2938">
    <title>[PATCH] aio: using hash table for active requests</title>
    <link>http://comments.gmane.org/gmane.linux.kernel.aio.general/2938</link>
    <description>&lt;pre&gt;This patch remove a TODO in fs/aio.c, that is to use hash table for active requests.

I prefer add an iocb at tail of collision chain, so I do not use hlist here.

Signed-off-by: Li Yu &amp;lt;raise.sail&amp;lt; at &amp;gt;gmail.com&amp;gt;
---

 fs/aio.c            |   90 ++++++++++++++++++++++++++++++++++++++--------------
 include/linux/aio.h |    2 -
 2 files changed, 68 insertions(+), 24 deletions(-)

diff --git a/fs/aio.c b/fs/aio.c
index 8c8f6c5..fee2aa3 100644
--- a/fs/aio.c
+++ b/fs/aio.c
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -65,6 +65,15 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static DECLARE_WORK(fput_work, aio_fput_routine);
 static DEFINE_SPINLOCK(fput_lock);
 static LIST_HEAD(fput_head);
 
+#if BITS_PER_LONG == 64
+#define AIO_ACTREQ_BUCKETS_SHIFT36
+#elif BITS_PER_LONG == 32
+#define AIO_ACTREQ_BUCKETS_SHIFT        24
+#endif
+
+/* AIO_ACTREQ_BUCKETS must be power of 2 */
+#define AIO_ACTREQ_BUCKETS(2*PAGE_SIZE/sizeof(struct list_head))
+
 #define AIO_BATCH_HASH_BITS3 /* allocated on-stack, so don't go crazy */
 #define AIO_BATCH_HASH_SIZE(1 &amp;lt;&amp;lt; AIO_BATCH_HASH_BITS)
 struct aio_batch_entry {
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -212,6 +221,9 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static void ctx_rcu_free(struct rcu_head *head)
 struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
 unsigned nr_events = ctx-&amp;gt;max_reqs;
 
+kfree(ctx-&amp;gt;active_reqs_table);
+ctx-&amp;gt;active_reqs_table = NULL;
+
 kmem_cache_free(kioctx_cachep, ctx);
 
 if (nr_events) {
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -249,6 +261,19 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static void __put_ioctx(struct kioctx *ctx)
 __put_ioctx(kioctx);\
 } while (0)
 
+static int ioctx_active_reqs_init(struct kioctx *ctx)
+{
+int i;
+
+ctx-&amp;gt;active_reqs_table = kmalloc(AIO_ACTREQ_BUCKETS*sizeof(struct list_head), GFP_KERNEL);
+if (!ctx-&amp;gt;active_reqs_table)
+return 1;
+/* we want to use list_add_tail(), hlist does not provide this API so far ... */
+for (i = 0; i &amp;lt; AIO_ACTREQ_BUCKETS; ++i)
+INIT_LIST_HEAD(ctx-&amp;gt;active_reqs_table+i);
+return 0;
+}
+
 /* ioctx_alloc
  *Allocates and initializes an ioctx.  Returns an ERR_PTR if it failed.
  */
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -281,7 +306,8 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static struct kioctx *ioctx_alloc(unsigned nr_events)
 spin_lock_init(&amp;amp;ctx-&amp;gt;ring_info.ring_lock);
 init_waitqueue_head(&amp;amp;ctx-&amp;gt;wait);
 
-INIT_LIST_HEAD(&amp;amp;ctx-&amp;gt;active_reqs);
+if (ioctx_active_reqs_init(ctx))
+goto out_freectx;
 INIT_LIST_HEAD(&amp;amp;ctx-&amp;gt;run_list);
 INIT_DELAYED_WORK(&amp;amp;ctx-&amp;gt;wq, aio_kick_handler);
 
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -331,6 +357,21 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; out_freectx:
 return ctx;
 }
 
+static inline void aio_cancel_one(struct kioctx *ctx, struct kiocb *iocb)
+{
+int (*cancel)(struct kiocb *, struct io_event *);
+struct io_event res;
+
+cancel = iocb-&amp;gt;ki_cancel;
+kiocbSetCancelled(iocb);
+if (cancel) {
+iocb-&amp;gt;ki_users++;
+spin_unlock_irq(&amp;amp;ctx-&amp;gt;ctx_lock);
+cancel(iocb, &amp;amp;res);
+spin_lock_irq(&amp;amp;ctx-&amp;gt;ctx_lock);
+}
+}
+
 /* aio_cancel_all
  *Cancels all outstanding aio requests on an aio context.  Used 
  *when the processes owning a context have all exited to encourage 
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -338,22 +379,21 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; out_freectx:
  */
 static void aio_cancel_all(struct kioctx *ctx)
 {
-int (*cancel)(struct kiocb *, struct io_event *);
-struct io_event res;
+int i, cleaned;
+i = cleaned = 0;
+
 spin_lock_irq(&amp;amp;ctx-&amp;gt;ctx_lock);
 ctx-&amp;gt;dead = 1;
-while (!list_empty(&amp;amp;ctx-&amp;gt;active_reqs)) {
-struct list_head *pos = ctx-&amp;gt;active_reqs.next;
-struct kiocb *iocb = list_kiocb(pos);
-list_del_init(&amp;amp;iocb-&amp;gt;ki_list);
-cancel = iocb-&amp;gt;ki_cancel;
-kiocbSetCancelled(iocb);
-if (cancel) {
-iocb-&amp;gt;ki_users++;
-spin_unlock_irq(&amp;amp;ctx-&amp;gt;ctx_lock);
-cancel(iocb, &amp;amp;res);
-spin_lock_irq(&amp;amp;ctx-&amp;gt;ctx_lock);
+for (; i &amp;lt; AIO_ACTREQ_BUCKETS; i++) {
+while (!list_empty(&amp;amp;ctx-&amp;gt;active_reqs_table[i])) {
+struct list_head *pos = ctx-&amp;gt;active_reqs_table[i].next;
+struct kiocb *iocb = list_kiocb(pos);
+list_del_init(pos);
+aio_cancel_one(ctx, iocb);
+++cleaned;
 }
+if (cleaned &amp;gt;= ctx-&amp;gt;reqs_active)
+break;
 }
 spin_unlock_irq(&amp;amp;ctx-&amp;gt;ctx_lock);
 }
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -440,8 +480,9 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; void exit_aio(struct mm_struct *mm)
  * This prevents races between the aio code path referencing the
  * req (after submitting it) and aio_complete() freeing the req.
  */
-static struct kiocb *__aio_get_req(struct kioctx *ctx)
+static struct kiocb *__aio_get_req(struct kioctx *ctx, void* tohash)
 {
+unsigned long bucket;
 struct kiocb *req = NULL;
 struct aio_ring *ring;
 int okay = 0;
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -465,10 +506,12 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static struct kiocb *__aio_get_req(struct kioctx *ctx)
 /* Check if the completion queue has enough free space to
  * accept an event from this io.
  */
+bucket = hash_long((unsigned long)tohash, AIO_ACTREQ_BUCKETS_SHIFT);
+bucket &amp;amp;= (AIO_ACTREQ_BUCKETS - 1);
 spin_lock_irq(&amp;amp;ctx-&amp;gt;ctx_lock);
 ring = kmap_atomic(ctx-&amp;gt;ring_info.ring_pages[0], KM_USER0);
 if (ctx-&amp;gt;reqs_active &amp;lt; aio_ring_avail(&amp;amp;ctx-&amp;gt;ring_info, ring)) {
-list_add(&amp;amp;req-&amp;gt;ki_list, &amp;amp;ctx-&amp;gt;active_reqs);
+list_add_tail(&amp;amp;req-&amp;gt;ki_list, &amp;amp;ctx-&amp;gt;active_reqs_table[bucket]);
 ctx-&amp;gt;reqs_active++;
 okay = 1;
 }
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -483,17 +526,17 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static struct kiocb *__aio_get_req(struct kioctx *ctx)
 return req;
 }
 
-static inline struct kiocb *aio_get_req(struct kioctx *ctx)
+static inline struct kiocb *aio_get_req(struct kioctx *ctx, void *iocb)
 {
 struct kiocb *req;
 /* Handle a potential starvation case -- should be exceedingly rare as 
  * requests will be stuck on fput_head only if the aio_fput_routine is 
  * delayed and the requests were the last user of the struct file.
  */
-req = __aio_get_req(ctx);
+req = __aio_get_req(ctx, iocb);
 if (unlikely(NULL == req)) {
 aio_fput_routine(NULL);
-req = __aio_get_req(ctx);
+req = __aio_get_req(ctx, iocb);
 }
 return req;
 }
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1605,7 +1648,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 if (unlikely(!file))
 return -EBADF;
 
-req = aio_get_req(ctx);/* returns with 2 references to req */
+req = aio_get_req(ctx, user_iocb);/* returns with 2 references to req */
 if (unlikely(!req)) {
 fput(file);
 return -EAGAIN;
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -1744,11 +1787,12 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static struct kiocb *lookup_kiocb(struct kioctx *ctx, struct iocb __user *iocb,
   u32 key)
 {
 struct list_head *pos;
+unsigned long bucket;
 
 assert_spin_locked(&amp;amp;ctx-&amp;gt;ctx_lock);
-
-/* TODO: use a hash or array, this sucks. */
-list_for_each(pos, &amp;amp;ctx-&amp;gt;active_reqs) {
+bucket = hash_long((unsigned long)iocb, AIO_ACTREQ_BUCKETS_SHIFT);
+bucket &amp;amp;= (AIO_ACTREQ_BUCKETS - 1);
+list_for_each(pos, &amp;amp;ctx-&amp;gt;active_reqs_table[bucket]) {
 struct kiocb *kiocb = list_kiocb(pos);
 if (kiocb-&amp;gt;ki_obj.user == iocb &amp;amp;&amp;amp; kiocb-&amp;gt;ki_key == key)
 return kiocb;
diff --git a/include/linux/aio.h b/include/linux/aio.h
index 7a8db41..1cf394b 100644
--- a/include/linux/aio.h
+++ b/include/linux/aio.h
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -189,7 +189,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; struct kioctx {
 spinlock_tctx_lock;
 
 intreqs_active;
-struct list_headactive_reqs;/* used for cancellation */
+struct list_head*active_reqs_table;/* used for cancellation */
 struct list_headrun_list;/* used for kicked reqs */
 
 /* sys_io_setup currently limits this to an unsigned int */

--
To unsubscribe, send a message with 'unsubscribe linux-aio' in
the body to majordomo&amp;lt; at &amp;gt;kvack.org.  For more info on Linux AIO,
see: http://www.kvack.org/aio/
Don't email: &amp;lt;a href=mailto:"aart&amp;lt; at &amp;gt;kvack.org"&amp;gt;aart&amp;lt; at &amp;gt;kvack.org&amp;lt;/a&amp;gt;

&lt;/pre&gt;</description>
    <dc:creator>Li Yu</dc:creator>
    <dc:date>2010-12-15T03:03:05</dc:date>
  </item>
  <item rdf:about="http://comments.gmane.org/gmane.linux.kernel.aio.general/2936">
    <title>[PATCH] aio: check return value of create_workqueue()</title>
    <link>http://comments.gmane.org/gmane.linux.kernel.aio.general/2936</link>
    <description>&lt;pre&gt;Signed-off-by: Namhyung Kim &amp;lt;namhyung&amp;lt; at &amp;gt;gmail.com&amp;gt;
---
 fs/aio.c |    2 +-
 1 files changed, 1 insertions(+), 1 deletions(-)

diff --git a/fs/aio.c b/fs/aio.c
index 8c8f6c5b6d79..c56153f5f73e 100644
--- a/fs/aio.c
+++ b/fs/aio.c
&amp;lt; at &amp;gt;&amp;lt; at &amp;gt; -87,7 +87,7 &amp;lt; at &amp;gt;&amp;lt; at &amp;gt; static int __init aio_setup(void)
 
 aio_wq = create_workqueue("aio");
 abe_pool = mempool_create_kmalloc_pool(1, sizeof(struct aio_batch_entry));
-BUG_ON(!abe_pool);
+BUG_ON(!aio_wq || !abe_pool);
 
 pr_debug("aio_setup: sizeof(struct page) = %d\n", (int)sizeof(struct page));
 
&lt;/pre&gt;</description>
    <dc:creator>Namhyung Kim</dc:creator>
    <dc:date>2010-12-13T15:06:25</dc:date>
  </item>
  <textinput rdf:about="http://search.gmane.org/?group=$group=gmane.linux.kernel.aio.general">
    <title>Search Engine</title>
    <description>Search the mailing list at Gmane</description>
    <name>query</name>
    <link>http://search.gmane.org/?group=$group=gmane.linux.kernel.aio.general</link>
  </textinput>
</rdf:RDF>

