![[LWN Logo]](/images/lcorner.png) |
|
![[LWN.net]](/images/Included.png) |
From: "Manfred Spraul" <manfred@colorfullife.com>
To: <linux-kernel@vger.kernel.org>
Subject: zerocopy pipe, new version
Date: Sat, 5 Jan 2002 23:27:25 +0100
Cc: <lse-tech@lists.sourceforge.net>
Attached is a new version of the zerocopy pipe code.
A few months ago, Hubertus Franke found a severe performance problem with my last
version. Now I figured out how I can solve it:
For good pipe performance, sys_read() must try to return as much data as possible with
one syscall, even if the writer writes small bits. The current code uses
PIPE_WAITING_WRITERS, but that doesn't work with nonblocking IO and is not that
efficient.
I added a sched_yield() into that path, and that fixed the performance degradation:
if pipe_read made progress and the user wants even more data, then call sched_yield() and
give the writers a chance to write additional data. After sched_yield() returns, try again until
there is either no more data, or the user request is completely fullfilled. Then return to userspace.
Now I got +50% on UP with pipeflex -c2 -r32 -w1 with 2.5.2-pre8+zerocopy, SMP kernel
over 2.4.2-UP.
Unfortunately the patch has virtually no effect on 4 kB write-4 kB read, and that's the most
common case. (number of context switches cut in half, but a slight performance loss on K6,
probably due to cache trashing)
The only solution I see for that problem are larger kernel buffers - more data must be queued
in kernel. Either on-demand (kmalloc() up to <sysctl-limit, around 512 kB> if a request for
<= 4096 bytes arrives and would block. If the allocation fails, then block), or at pipe creation
like in Hubertus' patch.
--
Manfred
// $Header$
// Kernel Version:
// VERSION = 2
// PATCHLEVEL = 5
// SUBLEVEL = 2
// EXTRAVERSION =-pre8
--- 2.5/fs/pipe.c Thu Oct 11 15:20:16 2001
+++ build-2.5/fs/pipe.c Wed Jan 2 18:12:28 2002
@@ -2,6 +2,9 @@
* linux/fs/pipe.c
*
* Copyright (C) 1991, 1992, 1999 Linus Torvalds
+ *
+ * Major pipe_read() and pipe_write() cleanup: Single copy,
+ * fewer schedules. Copyright (C) 2001 Manfred Spraul
*/
#include <linux/mm.h>
@@ -10,6 +13,8 @@
#include <linux/slab.h>
#include <linux/module.h>
#include <linux/init.h>
+#include <linux/highmem.h>
+#include <linux/compiler.h>
#include <asm/uaccess.h>
#include <asm/ioctls.h>
@@ -36,214 +41,360 @@
down(PIPE_SEM(*inode));
}
+#define PIO_PGCOUNT ((131072+PAGE_SIZE-1)/PAGE_SIZE)
+struct pipe_pio {
+ struct list_head list;
+ struct page *pages[PIO_PGCOUNT];
+ int offset;
+ size_t len;
+ size_t orig_len;
+ struct task_struct *tsk;
+};
+
+static ssize_t
+copy_from_piolist(struct list_head *piolist, void *buf, ssize_t len)
+{
+ struct list_head *walk = piolist->next;
+ int ret = 0;
+ while(walk != piolist && len) {
+ struct pipe_pio* pio = list_entry(walk, struct pipe_pio, list);
+ if (pio->len) {
+ struct page *page;
+ void *maddr;
+ int this_len, off, i;
+ int ret2;
+
+ i = pio->offset/PAGE_SIZE;
+ off = pio->offset%PAGE_SIZE;
+ this_len = len;
+ if (this_len > PAGE_SIZE-off)
+ this_len = PAGE_SIZE-off;
+ if (this_len > pio->len)
+ this_len = pio->len;
+
+ page = pio->pages[i];
+ maddr = kmap(page);
+ ret2 = copy_to_user(buf, maddr+off, this_len);
+ flush_page_to_ram(page);
+ kunmap(page);
+ if (unlikely(ret2)) {
+ if (ret)
+ return ret;
+ return -EFAULT;
+ }
+
+ buf += this_len;
+ len -= this_len;
+ pio->len -= this_len;
+ pio->offset += this_len;
+ ret += this_len;
+ if (pio->len == 0)
+ wake_up_process(pio->tsk);
+ } else {
+ walk = walk->next;
+ }
+ }
+ return ret;
+}
+
+static void
+build_pio(struct pipe_pio *pio, struct inode *inode, const void *buf, size_t count)
+{
+ int len;
+ struct vm_area_struct *vmas[PIO_PGCOUNT];
+
+ pio->tsk = current;
+ pio->len = count;
+ pio->offset = (unsigned long)buf&(PAGE_SIZE-1);
+
+ pio->len = PIO_PGCOUNT*PAGE_SIZE - pio->offset;
+ if (pio->len > count)
+ pio->len = count;
+ len = (pio->offset+pio->len+PAGE_SIZE-1)/PAGE_SIZE;
+ down_read(¤t->mm->mmap_sem);
+ len = get_user_pages(current, current->mm, (unsigned long)buf, len,
+ 0, 0, pio->pages, vmas);
+ if (len > 0) {
+ int i;
+ for(i=0;i<len;i++) {
+ flush_cache_page(vmas[i], addr+i*PAGE_SIZE);
+ }
+ len = len*PAGE_SIZE-pio->offset;
+ if (len < pio->len)
+ pio->len = len;
+ list_add_tail(&pio->list, &PIPE_PIO(*inode));
+ PIPE_PIOLEN(*inode) += pio->len;
+ pio->orig_len = pio->len;
+ } else {
+ pio->list.next = NULL;
+ }
+ up_read(¤t->mm->mmap_sem);
+}
+
+static size_t
+teardown_pio(struct pipe_pio *pio, struct inode *inode, const void *buf)
+{
+ int i;
+ if (!pio->list.next)
+ return 0;
+ for (i=0;i<(pio->len+pio->offset+PAGE_SIZE-1)/PAGE_SIZE;i++) {
+ if (pio->pages[i]) {
+ put_page(pio->pages[i]);
+ }
+ }
+ i = pio->orig_len - pio->len;
+ PIPE_PIOLEN(*inode) -= pio->len;
+ list_del(&pio->list);
+ if (i && pio->len) {
+ /*
+ * We would violate the atomicity requirements:
+ * 1 byte in the internal buffer.
+ * write(fd, buf, PIPE_BUF);
+ * --> doesn't fit into internal buffer, pio build.
+ * read(fd, buf, 200);(i.e. 199 bytes from pio)
+ * signal sent to writer.
+ * The writer must not return with 199 bytes written!
+ * Fortunately the internal buffer will be empty in this
+ * case. Write into the internal buffer before
+ * checking for signals/error conditions.
+ */
+ size_t j = min((size_t)PIPE_SIZE, pio->len);
+ if (PIPE_LEN(*inode)) BUG();
+ if (PIPE_START(*inode)) BUG();
+ if (!copy_from_user(PIPE_BASE(*inode), buf + i, j)) {
+ i += j;
+ PIPE_LEN(*inode) = j;
+ }
+ }
+ return i;
+}
+/*
+ * reader:
+ flush_cache_page(vma, addr);
+ *
+ flush_icache_page(vma, page);
+ */
static ssize_t
pipe_read(struct file *filp, char *buf, size_t count, loff_t *ppos)
{
struct inode *inode = filp->f_dentry->d_inode;
- ssize_t size, read, ret;
+ ssize_t read;
+ ssize_t lastread;
- /* Seeks are not allowed on pipes. */
- ret = -ESPIPE;
- read = 0;
- if (ppos != &filp->f_pos)
- goto out_nolock;
+ /* pread is not allowed on pipes. */
+ if (unlikely(ppos != &filp->f_pos))
+ return -ESPIPE;
/* Always return 0 on null read. */
- ret = 0;
- if (count == 0)
- goto out_nolock;
-
- /* Get the pipe semaphore */
- ret = -ERESTARTSYS;
- if (down_interruptible(PIPE_SEM(*inode)))
- goto out_nolock;
-
- if (PIPE_EMPTY(*inode)) {
-do_more_read:
- ret = 0;
- if (!PIPE_WRITERS(*inode))
- goto out;
+ if (unlikely(count == 0))
+ return 0;
- ret = -EAGAIN;
- if (filp->f_flags & O_NONBLOCK)
- goto out;
+ down(PIPE_SEM(*inode));
- for (;;) {
- PIPE_WAITING_READERS(*inode)++;
- pipe_wait(inode);
- PIPE_WAITING_READERS(*inode)--;
- ret = -ERESTARTSYS;
- if (signal_pending(current))
- goto out;
- ret = 0;
- if (!PIPE_EMPTY(*inode))
- break;
- if (!PIPE_WRITERS(*inode))
+ read = 0;
+ lastread = 0;
+ for (;;) {
+ /* read what data is available */
+ int chars;
+ while( (chars = PIPE_LEN(*inode)) ) {
+ char *pipebuf = PIPE_BASE(*inode);
+ int offset = PIPE_START(*inode)%PIPE_BUF;
+ if (chars > count)
+ chars = count;
+ if (chars > PIPE_SIZE-offset)
+ chars = PIPE_SIZE-offset;
+ if (unlikely(copy_to_user(buf, pipebuf+offset, chars))) {
+ if (!read)
+ read = -EFAULT;
goto out;
+ }
+ PIPE_LEN(*inode) -= chars;
+ if (!PIPE_LEN(*inode)) {
+ /* Cache behaviour optimization */
+ PIPE_START(*inode) = 0;
+ } else {
+ /* there is no need to limit PIPE_START
+ * to PIPE_BUF - the user does
+ * %PIPE_BUF anyway.
+ */
+ PIPE_START(*inode) += chars;
+ }
+ read += chars;
+ count -= chars;
+ if (!count)
+ goto out; /* common case: done */
+ buf += chars;
+ /* Check again that the internal buffer is empty.
+ * If it was cyclic more data could be in the buffer.
+ */
}
- }
-
- /* Read what data is available. */
- ret = -EFAULT;
- while (count > 0 && (size = PIPE_LEN(*inode))) {
- char *pipebuf = PIPE_BASE(*inode) + PIPE_START(*inode);
- ssize_t chars = PIPE_MAX_RCHUNK(*inode);
-
- if (chars > count)
- chars = count;
- if (chars > size)
- chars = size;
+ if (PIPE_PIOLEN(*inode)) {
+ chars = copy_from_piolist(&PIPE_PIO(*inode), buf, count);
+ if (unlikely(chars < 0)) {
+ if (!read)
+ read = chars;
+ goto out;
+ }
+ PIPE_PIOLEN(*inode) -= chars;
+ read += chars;
+ count -= chars;
+ if (!count)
+ goto out; /* common case: done */
+ buf += chars;
- if (copy_to_user(buf, pipebuf, chars))
+ }
+ if (PIPE_PIOLEN(*inode) || PIPE_LEN(*inode)) BUG();
+ if (lastread != read) {
+ up(PIPE_SEM(*inode));
+ wake_up_interruptible(PIPE_WAIT(*inode));
+ sys_sched_yield();
+ schedule();
+ down(PIPE_SEM(*inode));
+ lastread = read;
+ continue;
+ }
+ /* tests before sleeping:
+ * - don't sleep if data was read.
+ */
+ if (read)
goto out;
- read += chars;
- PIPE_START(*inode) += chars;
- PIPE_START(*inode) &= (PIPE_SIZE - 1);
- PIPE_LEN(*inode) -= chars;
- count -= chars;
- buf += chars;
- }
-
- /* Cache behaviour optimization */
- if (!PIPE_LEN(*inode))
- PIPE_START(*inode) = 0;
+ /* - don't sleep if no process has the pipe open
+ * for writing
+ */
+ if (unlikely(!PIPE_WRITERS(*inode)))
+ goto out;
- if (count && PIPE_WAITING_WRITERS(*inode) && !(filp->f_flags & O_NONBLOCK)) {
- /*
- * We know that we are going to sleep: signal
- * writers synchronously that there is more
- * room.
+ /* - don't sleep if O_NONBLOCK is set */
+ if (filp->f_flags & O_NONBLOCK) {
+ read = -EAGAIN;
+ goto out;
+ }
+ /* - don't sleep if a signal is pending */
+ if (unlikely(signal_pending(current))) {
+ read = -ERESTARTSYS;
+ goto out;
+ }
+ /* readers never need to wake up if they go to sleep:
+ * They only sleep if they didn't read anything
*/
- wake_up_interruptible_sync(PIPE_WAIT(*inode));
- if (!PIPE_EMPTY(*inode))
- BUG();
- goto do_more_read;
+ pipe_wait(inode);
}
- /* Signal writers asynchronously that there is more room. */
- wake_up_interruptible(PIPE_WAIT(*inode));
-
- ret = read;
out:
up(PIPE_SEM(*inode));
-out_nolock:
- if (read)
- ret = read;
- return ret;
+ /* If we drained the pipe, then wakeup everyone
+ * waiting for that - either poll(2) or write(2).
+ * We are only reading, therefore we can access without locking.
+ */
+ if (read > 0 && !PIPE_PIOLEN(*inode) && !PIPE_LEN(*inode))
+ wake_up_interruptible(PIPE_WAIT(*inode));
+
+ return read;
}
static ssize_t
pipe_write(struct file *filp, const char *buf, size_t count, loff_t *ppos)
{
struct inode *inode = filp->f_dentry->d_inode;
- ssize_t free, written, ret;
-
- /* Seeks are not allowed on pipes. */
- ret = -ESPIPE;
- written = 0;
- if (ppos != &filp->f_pos)
- goto out_nolock;
+ size_t min;
+ ssize_t written;
+ int do_wakeup;
+
+ /* pwrite is not allowed on pipes. */
+ if (unlikely(ppos != &filp->f_pos))
+ return -ESPIPE;
/* Null write succeeds. */
- ret = 0;
- if (count == 0)
- goto out_nolock;
-
- ret = -ERESTARTSYS;
- if (down_interruptible(PIPE_SEM(*inode)))
- goto out_nolock;
+ if (unlikely(count == 0))
+ return 0;
+ min = count;
+ if (min > PIPE_BUF && (filp->f_flags & O_NONBLOCK))
+ min = 1; /* no atomicity guarantee for transfers > PIPE_BUF */
- /* No readers yields SIGPIPE. */
- if (!PIPE_READERS(*inode))
- goto sigpipe;
-
- /* If count <= PIPE_BUF, we have to make it atomic. */
- free = (count <= PIPE_BUF ? count : 1);
-
- /* Wait, or check for, available space. */
- if (filp->f_flags & O_NONBLOCK) {
- ret = -EAGAIN;
- if (PIPE_FREE(*inode) < free)
- goto out;
- } else {
- while (PIPE_FREE(*inode) < free) {
- PIPE_WAITING_WRITERS(*inode)++;
- pipe_wait(inode);
- PIPE_WAITING_WRITERS(*inode)--;
- ret = -ERESTARTSYS;
- if (signal_pending(current))
- goto out;
-
- if (!PIPE_READERS(*inode))
- goto sigpipe;
+ down(PIPE_SEM(*inode));
+ written = 0;
+ do_wakeup = 0;
+ for(;;) {
+ int start;
+ size_t chars;
+ /* No readers yields SIGPIPE. */
+ if (unlikely(!PIPE_READERS(*inode))) {
+ if (!written)
+ written = -EPIPE;
+ break;
}
- }
-
- /* Copy into available space. */
- ret = -EFAULT;
- while (count > 0) {
- int space;
- char *pipebuf = PIPE_BASE(*inode) + PIPE_END(*inode);
- ssize_t chars = PIPE_MAX_WCHUNK(*inode);
-
- if ((space = PIPE_FREE(*inode)) != 0) {
+ if (PIPE_PIOLEN(*inode))
+ goto skip_int_buf;
+ /* write to internal buffer - could be cyclic */
+ while(start = PIPE_LEN(*inode),chars = PIPE_SIZE - start, chars >= min) {
+ start += PIPE_START(*inode);
+ start %= PIPE_SIZE;
+ if (chars > PIPE_BUF - start)
+ chars = PIPE_BUF - start;
if (chars > count)
chars = count;
- if (chars > space)
- chars = space;
-
- if (copy_from_user(pipebuf, buf, chars))
+ if (unlikely(copy_from_user(PIPE_BASE(*inode)+start,
+ buf, chars))) {
+ if (!written)
+ written = -EFAULT;
goto out;
-
- written += chars;
+ }
+ do_wakeup = 1;
PIPE_LEN(*inode) += chars;
count -= chars;
+ written += chars;
+ if (!count)
+ goto out;
buf += chars;
- space = PIPE_FREE(*inode);
- continue;
+ min = 1;
}
-
- ret = written;
- if (filp->f_flags & O_NONBLOCK)
+skip_int_buf:
+ if (!filp->f_flags & O_NONBLOCK) {
+ if (!written)
+ written = -EAGAIN;
break;
+ }
- do {
- /*
- * Synchronous wake-up: it knows that this process
- * is going to give up this CPU, so it doesnt have
- * to do idle reschedules.
+ if (unlikely(signal_pending(current))) {
+ if (!written)
+ written = -ERESTARTSYS;
+ break;
+ }
+ {
+ struct pipe_pio my_pio;
+ /* build_pio
+ * wakeup readers:
+ * If the pipe was empty and now contains data, then do
+ * a wakeup. We will sleep --> sync wakeup.
*/
- wake_up_interruptible_sync(PIPE_WAIT(*inode));
- PIPE_WAITING_WRITERS(*inode)++;
+ build_pio(&my_pio, inode, buf, count);
+ if (do_wakeup || PIPE_PIO(*inode).next == &my_pio.list)
+ wake_up_sync(PIPE_WAIT(*inode));
+ do_wakeup = 0;
pipe_wait(inode);
- PIPE_WAITING_WRITERS(*inode)--;
- if (signal_pending(current))
- goto out;
- if (!PIPE_READERS(*inode))
- goto sigpipe;
- } while (!PIPE_FREE(*inode));
- ret = -EFAULT;
+ chars = teardown_pio(&my_pio, inode, buf);
+ count -= chars;
+ written += chars;
+ if (!count)
+ break;
+ buf += chars;
+ }
}
-
- /* Signal readers asynchronously that there is more data. */
- wake_up_interruptible(PIPE_WAIT(*inode));
-
- inode->i_ctime = inode->i_mtime = CURRENT_TIME;
- mark_inode_dirty(inode);
-
out:
+ if (written > 0) {
+ /* SuS V2: st_ctime and st_mtime are updated
+ * uppon successful completion of write(2).
+ */
+ inode->i_ctime = inode->i_mtime = CURRENT_TIME;
+ mark_inode_dirty(inode);
+ }
up(PIPE_SEM(*inode));
-out_nolock:
- if (written)
- ret = written;
- return ret;
-sigpipe:
- if (written)
- goto out;
- up(PIPE_SEM(*inode));
- send_sig(SIGPIPE, current, 0);
- return -EPIPE;
+ if (do_wakeup)
+ wake_up(PIPE_WAIT(*inode));
+ if (written == -EPIPE)
+ send_sig(SIGPIPE, current, 0);
+ return written;
}
static loff_t
@@ -270,7 +421,8 @@
{
switch (cmd) {
case FIONREAD:
- return put_user(PIPE_LEN(*pino), (int *)arg);
+ return put_user(PIPE_LEN(*filp->f_dentry->d_inode) +
+ PIPE_PIOLEN(*filp->f_dentry->d_inode), (int *)arg);
default:
return -EINVAL;
}
@@ -286,11 +438,20 @@
poll_wait(filp, PIPE_WAIT(*inode), wait);
/* Reading only -- no need for acquiring the semaphore. */
+
+ /*
+ * POLLIN means that data is available for read.
+ * POLLOUT means that a nonblocking write will succeed.
+ * We can only guarantee that if the internal buffers are empty
+ * Therefore both are mutually exclusive.
+ */
mask = POLLIN | POLLRDNORM;
- if (PIPE_EMPTY(*inode))
+ if (!PIPE_LEN(*inode) && !PIPE_PIOLEN(*inode))
mask = POLLOUT | POLLWRNORM;
+ /* POLLHUP: no writer, and there was at least once a writer */
if (!PIPE_WRITERS(*inode) && filp->f_version != PIPE_WCOUNTER(*inode))
mask |= POLLHUP;
+ /* POLLERR: no reader */
if (!PIPE_READERS(*inode))
mask |= POLLERR;
@@ -454,9 +615,9 @@
init_waitqueue_head(PIPE_WAIT(*inode));
PIPE_BASE(*inode) = (char*) page;
- PIPE_START(*inode) = PIPE_LEN(*inode) = 0;
+ INIT_LIST_HEAD(&PIPE_PIO(*inode));
+ PIPE_START(*inode) = PIPE_LEN(*inode) = PIPE_PIOLEN(*inode) = 0;
PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 0;
- PIPE_WAITING_READERS(*inode) = PIPE_WAITING_WRITERS(*inode) = 0;
PIPE_RCOUNTER(*inode) = PIPE_WCOUNTER(*inode) = 1;
return inode;
--- 2.5/include/linux/pipe_fs_i.h Sat Apr 28 10:37:27 2001
+++ build-2.5/include/linux/pipe_fs_i.h Wed Jan 2 00:56:14 2002
@@ -5,12 +5,12 @@
struct pipe_inode_info {
wait_queue_head_t wait;
char *base;
- unsigned int len;
+ size_t len; /* not including pio buffers */
+ size_t piolen;
unsigned int start;
+ struct list_head pio;
unsigned int readers;
unsigned int writers;
- unsigned int waiting_readers;
- unsigned int waiting_writers;
unsigned int r_counter;
unsigned int w_counter;
};
@@ -24,19 +24,15 @@
#define PIPE_BASE(inode) ((inode).i_pipe->base)
#define PIPE_START(inode) ((inode).i_pipe->start)
#define PIPE_LEN(inode) ((inode).i_pipe->len)
+#define PIPE_PIOLEN(inode) ((inode).i_pipe->piolen)
+#define PIPE_PIO(inode) ((inode).i_pipe->pio)
#define PIPE_READERS(inode) ((inode).i_pipe->readers)
#define PIPE_WRITERS(inode) ((inode).i_pipe->writers)
-#define PIPE_WAITING_READERS(inode) ((inode).i_pipe->waiting_readers)
-#define PIPE_WAITING_WRITERS(inode) ((inode).i_pipe->waiting_writers)
#define PIPE_RCOUNTER(inode) ((inode).i_pipe->r_counter)
#define PIPE_WCOUNTER(inode) ((inode).i_pipe->w_counter)
-#define PIPE_EMPTY(inode) (PIPE_LEN(inode) == 0)
-#define PIPE_FULL(inode) (PIPE_LEN(inode) == PIPE_SIZE)
#define PIPE_FREE(inode) (PIPE_SIZE - PIPE_LEN(inode))
#define PIPE_END(inode) ((PIPE_START(inode) + PIPE_LEN(inode)) & (PIPE_SIZE-1))
-#define PIPE_MAX_RCHUNK(inode) (PIPE_SIZE - PIPE_START(inode))
-#define PIPE_MAX_WCHUNK(inode) (PIPE_SIZE - PIPE_END(inode))
/* Drop the inode semaphore and wait for a pipe event, atomically */
void pipe_wait(struct inode * inode);
--- 2.5/fs/fifo.c Fri Feb 23 15:25:22 2001
+++ build-2.5/fs/fifo.c Wed Jan 2 00:56:14 2002
@@ -32,10 +32,8 @@
{
int ret;
- ret = -ERESTARTSYS;
- lock_kernel();
if (down_interruptible(PIPE_SEM(*inode)))
- goto err_nolock_nocleanup;
+ return -ERESTARTSYS;
if (!inode->i_pipe) {
ret = -ENOMEM;
@@ -116,7 +114,6 @@
/* Ok! */
up(PIPE_SEM(*inode));
- unlock_kernel();
return 0;
err_rd:
@@ -141,9 +138,6 @@
err_nocleanup:
up(PIPE_SEM(*inode));
-
-err_nolock_nocleanup:
- unlock_kernel();
return ret;
}