[LWN Logo]
[LWN.net]
From:	 "Manfred Spraul" <manfred@colorfullife.com>
To:	 <linux-kernel@vger.kernel.org>
Subject: zerocopy pipe, new version
Date:	 Sat, 5 Jan 2002 23:27:25 +0100
Cc:	 <lse-tech@lists.sourceforge.net>

Attached is a new version of the zerocopy pipe code.

A few months ago, Hubertus Franke found a severe performance problem with my last
version. Now I figured out how I can solve it:
For good pipe performance, sys_read() must try to return as much data as possible with
one syscall, even if the writer writes small bits. The current code uses
PIPE_WAITING_WRITERS, but that doesn't work with nonblocking IO and is not that
efficient.

I added a sched_yield() into that path, and that fixed the performance degradation:
if pipe_read made progress and the user wants even more data, then call sched_yield() and
give the writers a chance to write additional data. After sched_yield() returns, try again until
there is either no more data, or the user request is completely fullfilled. Then return to userspace.

Now I got +50% on UP with pipeflex -c2 -r32 -w1 with 2.5.2-pre8+zerocopy, SMP kernel
over 2.4.2-UP.

Unfortunately the patch has virtually no effect on 4 kB write-4 kB read, and that's the most
common case. (number of context switches cut in half, but a slight performance loss on K6,
probably due to cache trashing)

The only solution I see for that problem are larger kernel buffers - more data must be queued
in kernel. Either on-demand (kmalloc() up to <sysctl-limit, around 512 kB> if a request for
<= 4096 bytes arrives and would block. If the allocation fails, then block), or at pipe creation
like in Hubertus' patch.

--
    Manfred

// $Header$
// Kernel Version:
//  VERSION = 2
//  PATCHLEVEL = 5
//  SUBLEVEL = 2
//  EXTRAVERSION =-pre8
--- 2.5/fs/pipe.c	Thu Oct 11 15:20:16 2001
+++ build-2.5/fs/pipe.c	Wed Jan  2 18:12:28 2002
@@ -2,6 +2,9 @@
  *  linux/fs/pipe.c
  *
  *  Copyright (C) 1991, 1992, 1999  Linus Torvalds
+ *
+ *  Major pipe_read() and pipe_write() cleanup:  Single copy,
+ *  fewer schedules.	Copyright (C) 2001 Manfred Spraul
  */
 
 #include <linux/mm.h>
@@ -10,6 +13,8 @@
 #include <linux/slab.h>
 #include <linux/module.h>
 #include <linux/init.h>
+#include <linux/highmem.h>
+#include <linux/compiler.h>
 
 #include <asm/uaccess.h>
 #include <asm/ioctls.h>
@@ -36,214 +41,360 @@
 	down(PIPE_SEM(*inode));
 }
 
+#define PIO_PGCOUNT	((131072+PAGE_SIZE-1)/PAGE_SIZE)
+struct pipe_pio {
+	struct list_head list;
+	struct page *pages[PIO_PGCOUNT];
+	int offset;
+	size_t len;
+	size_t orig_len;
+	struct task_struct *tsk;
+};
+
+static ssize_t
+copy_from_piolist(struct list_head *piolist, void *buf, ssize_t len)
+{
+	struct list_head *walk = piolist->next;
+	int ret = 0;
+	while(walk != piolist && len) {
+		struct pipe_pio* pio = list_entry(walk, struct pipe_pio, list);
+		if (pio->len) {
+			struct page *page;
+			void *maddr;
+			int this_len, off, i;
+			int ret2;
+
+			i = pio->offset/PAGE_SIZE;
+			off = pio->offset%PAGE_SIZE;
+			this_len = len;
+			if (this_len > PAGE_SIZE-off)
+				this_len = PAGE_SIZE-off;
+			if (this_len > pio->len)
+				this_len = pio->len;
+
+			page = pio->pages[i];
+			maddr = kmap(page);
+			ret2 = copy_to_user(buf, maddr+off, this_len);
+			flush_page_to_ram(page);
+			kunmap(page);
+			if (unlikely(ret2)) {
+				if (ret)
+					return ret;
+				return -EFAULT;
+			}
+
+			buf += this_len;
+			len -= this_len;
+			pio->len -= this_len;
+			pio->offset += this_len;
+			ret += this_len;
+			if (pio->len == 0)
+				wake_up_process(pio->tsk);
+		} else {
+			walk = walk->next;
+		}
+	}
+	return ret;
+}
+
+static void
+build_pio(struct pipe_pio *pio, struct inode *inode, const void *buf, size_t count)
+{
+	int len;
+	struct vm_area_struct *vmas[PIO_PGCOUNT];
+
+	pio->tsk = current;
+	pio->len = count;
+	pio->offset = (unsigned long)buf&(PAGE_SIZE-1);
+
+	pio->len = PIO_PGCOUNT*PAGE_SIZE - pio->offset;
+	if (pio->len > count)
+		pio->len = count;
+	len = (pio->offset+pio->len+PAGE_SIZE-1)/PAGE_SIZE;
+	down_read(&current->mm->mmap_sem);
+	len = get_user_pages(current, current->mm, (unsigned long)buf, len,
+			0, 0, pio->pages, vmas);
+	if (len > 0) {
+		int i;
+		for(i=0;i<len;i++) {
+			flush_cache_page(vmas[i], addr+i*PAGE_SIZE);
+		}
+		len = len*PAGE_SIZE-pio->offset;
+		if (len < pio->len)
+			pio->len = len;
+		list_add_tail(&pio->list, &PIPE_PIO(*inode));
+		PIPE_PIOLEN(*inode) += pio->len;
+		pio->orig_len = pio->len;
+	} else {
+		pio->list.next = NULL;
+	}
+	up_read(&current->mm->mmap_sem);
+}
+
+static size_t
+teardown_pio(struct pipe_pio *pio, struct inode *inode, const void *buf)
+{
+	int i;
+	if (!pio->list.next)
+		return 0;
+	for (i=0;i<(pio->len+pio->offset+PAGE_SIZE-1)/PAGE_SIZE;i++) {
+		if (pio->pages[i]) {
+			put_page(pio->pages[i]);
+		}
+	}
+	i = pio->orig_len - pio->len;
+	PIPE_PIOLEN(*inode) -= pio->len;
+	list_del(&pio->list);
+	if (i && pio->len) {
+		/*
+		 * We would violate the atomicity requirements:
+		 * 1 byte in the internal buffer.
+		 * write(fd, buf, PIPE_BUF);
+		 * --> doesn't fit into internal buffer, pio build.
+		 * read(fd, buf, 200);(i.e. 199 bytes from pio)
+		 * signal sent to writer.
+		 * The writer must not return with 199 bytes written!
+		 * Fortunately the internal buffer will be empty in this
+		 * case. Write into the internal buffer before
+		 * checking for signals/error conditions.
+		 */
+		size_t j = min((size_t)PIPE_SIZE, pio->len);
+		if (PIPE_LEN(*inode)) BUG();
+		if (PIPE_START(*inode)) BUG();
+		if (!copy_from_user(PIPE_BASE(*inode), buf + i, j)) {
+			i += j;
+			PIPE_LEN(*inode) =  j;
+		}
+	}
+	return i;
+}
+/*
+ * reader:
+	flush_cache_page(vma, addr);
+ *
+		flush_icache_page(vma, page);
+ */
 static ssize_t
 pipe_read(struct file *filp, char *buf, size_t count, loff_t *ppos)
 {
 	struct inode *inode = filp->f_dentry->d_inode;
-	ssize_t size, read, ret;
+	ssize_t read;
+	ssize_t lastread;
 
-	/* Seeks are not allowed on pipes.  */
-	ret = -ESPIPE;
-	read = 0;
-	if (ppos != &filp->f_pos)
-		goto out_nolock;
+	/* pread is not allowed on pipes.  */
+	if (unlikely(ppos != &filp->f_pos))
+		return -ESPIPE;
 
 	/* Always return 0 on null read.  */
-	ret = 0;
-	if (count == 0)
-		goto out_nolock;
-
-	/* Get the pipe semaphore */
-	ret = -ERESTARTSYS;
-	if (down_interruptible(PIPE_SEM(*inode)))
-		goto out_nolock;
-
-	if (PIPE_EMPTY(*inode)) {
-do_more_read:
-		ret = 0;
-		if (!PIPE_WRITERS(*inode))
-			goto out;
+	if (unlikely(count == 0))
+		return 0;
 
-		ret = -EAGAIN;
-		if (filp->f_flags & O_NONBLOCK)
-			goto out;
+	down(PIPE_SEM(*inode));
 
-		for (;;) {
-			PIPE_WAITING_READERS(*inode)++;
-			pipe_wait(inode);
-			PIPE_WAITING_READERS(*inode)--;
-			ret = -ERESTARTSYS;
-			if (signal_pending(current))
-				goto out;
-			ret = 0;
-			if (!PIPE_EMPTY(*inode))
-				break;
-			if (!PIPE_WRITERS(*inode))
+	read = 0;
+	lastread = 0;
+	for (;;) {
+		/* read what data is available */
+		int chars;
+		while( (chars = PIPE_LEN(*inode)) ) {
+			char *pipebuf = PIPE_BASE(*inode);
+			int offset = PIPE_START(*inode)%PIPE_BUF;
+			if (chars > count)
+				chars = count;
+			if (chars > PIPE_SIZE-offset)
+				chars = PIPE_SIZE-offset;
+			if (unlikely(copy_to_user(buf, pipebuf+offset, chars))) {
+				if (!read)
+					read = -EFAULT;
 				goto out;
+			}
+			PIPE_LEN(*inode) -= chars;
+			if (!PIPE_LEN(*inode)) {
+				/* Cache behaviour optimization */
+				PIPE_START(*inode) = 0;
+			} else {
+				/* there is no need to limit PIPE_START
+				 * to PIPE_BUF - the user does
+				 * %PIPE_BUF anyway.
+				 */
+				PIPE_START(*inode) += chars;
+			}
+			read += chars;
+			count -= chars;
+			if (!count)
+				goto out; /* common case: done */
+			buf += chars;
+			/* Check again that the internal buffer is empty.
+			 * If it was cyclic more data could be in the buffer.
+			 */
 		}
-	}
-
-	/* Read what data is available.  */
-	ret = -EFAULT;
-	while (count > 0 && (size = PIPE_LEN(*inode))) {
-		char *pipebuf = PIPE_BASE(*inode) + PIPE_START(*inode);
-		ssize_t chars = PIPE_MAX_RCHUNK(*inode);
-
-		if (chars > count)
-			chars = count;
-		if (chars > size)
-			chars = size;
+		if (PIPE_PIOLEN(*inode)) {
+			chars = copy_from_piolist(&PIPE_PIO(*inode), buf, count);
+			if (unlikely(chars < 0)) {
+				if (!read)
+					read = chars;
+				goto out;
+			}
+			PIPE_PIOLEN(*inode) -= chars;
+			read += chars;
+			count -= chars;
+			if (!count)
+				goto out; /* common case: done */
+			buf += chars;
 
-		if (copy_to_user(buf, pipebuf, chars))
+		}
+		if (PIPE_PIOLEN(*inode) || PIPE_LEN(*inode)) BUG();
+		if (lastread != read) {
+			up(PIPE_SEM(*inode));
+			wake_up_interruptible(PIPE_WAIT(*inode));
+			sys_sched_yield();
+			schedule();
+			down(PIPE_SEM(*inode));
+			lastread = read;
+			continue;
+		}
+		/* tests before sleeping:
+		 *  - don't sleep if data was read.
+		 */
+		if (read)
 			goto out;
 
-		read += chars;
-		PIPE_START(*inode) += chars;
-		PIPE_START(*inode) &= (PIPE_SIZE - 1);
-		PIPE_LEN(*inode) -= chars;
-		count -= chars;
-		buf += chars;
-	}
-
-	/* Cache behaviour optimization */
-	if (!PIPE_LEN(*inode))
-		PIPE_START(*inode) = 0;
+		/*  - don't sleep if no process has the pipe open
+		 *	 for writing
+		 */
+		if (unlikely(!PIPE_WRITERS(*inode)))
+			goto out;
 
-	if (count && PIPE_WAITING_WRITERS(*inode) && !(filp->f_flags & O_NONBLOCK)) {
-		/*
-		 * We know that we are going to sleep: signal
-		 * writers synchronously that there is more
-		 * room.
+		/*   - don't sleep if O_NONBLOCK is set */
+		if (filp->f_flags & O_NONBLOCK) {
+			read = -EAGAIN;
+			goto out;
+		}
+		/*   - don't sleep if a signal is pending */
+		if (unlikely(signal_pending(current))) {
+			read = -ERESTARTSYS;
+			goto out;
+		}
+		/* readers never need to wake up if they go to sleep:
+		 * They only sleep if they didn't read anything
 		 */
-		wake_up_interruptible_sync(PIPE_WAIT(*inode));
-		if (!PIPE_EMPTY(*inode))
-			BUG();
-		goto do_more_read;
+		pipe_wait(inode);
 	}
-	/* Signal writers asynchronously that there is more room.  */
-	wake_up_interruptible(PIPE_WAIT(*inode));
-
-	ret = read;
 out:
 	up(PIPE_SEM(*inode));
-out_nolock:
-	if (read)
-		ret = read;
-	return ret;
+	/* If we drained the pipe, then wakeup everyone
+	 * waiting for that - either poll(2) or write(2).
+	 * We are only reading, therefore we can access without locking.
+	 */
+	if (read > 0 && !PIPE_PIOLEN(*inode) && !PIPE_LEN(*inode))
+		wake_up_interruptible(PIPE_WAIT(*inode));
+
+	return read;
 }
 
 static ssize_t
 pipe_write(struct file *filp, const char *buf, size_t count, loff_t *ppos)
 {
 	struct inode *inode = filp->f_dentry->d_inode;
-	ssize_t free, written, ret;
-
-	/* Seeks are not allowed on pipes.  */
-	ret = -ESPIPE;
-	written = 0;
-	if (ppos != &filp->f_pos)
-		goto out_nolock;
+	size_t min;
+	ssize_t written;
+	int do_wakeup;
+
+	/* pwrite is not allowed on pipes.  */
+	if (unlikely(ppos != &filp->f_pos))
+		return -ESPIPE;
 
 	/* Null write succeeds.  */
-	ret = 0;
-	if (count == 0)
-		goto out_nolock;
-
-	ret = -ERESTARTSYS;
-	if (down_interruptible(PIPE_SEM(*inode)))
-		goto out_nolock;
+	if (unlikely(count == 0))
+		return 0;
+	min = count;
+	if (min > PIPE_BUF && (filp->f_flags & O_NONBLOCK))
+		min = 1; /* no atomicity guarantee for transfers > PIPE_BUF */
 
-	/* No readers yields SIGPIPE.  */
-	if (!PIPE_READERS(*inode))
-		goto sigpipe;
-
-	/* If count <= PIPE_BUF, we have to make it atomic.  */
-	free = (count <= PIPE_BUF ? count : 1);
-
-	/* Wait, or check for, available space.  */
-	if (filp->f_flags & O_NONBLOCK) {
-		ret = -EAGAIN;
-		if (PIPE_FREE(*inode) < free)
-			goto out;
-	} else {
-		while (PIPE_FREE(*inode) < free) {
-			PIPE_WAITING_WRITERS(*inode)++;
-			pipe_wait(inode);
-			PIPE_WAITING_WRITERS(*inode)--;
-			ret = -ERESTARTSYS;
-			if (signal_pending(current))
-				goto out;
-
-			if (!PIPE_READERS(*inode))
-				goto sigpipe;
+	down(PIPE_SEM(*inode));
+	written = 0;
+	do_wakeup = 0;
+	for(;;) {
+		int start;
+		size_t chars;
+		/* No readers yields SIGPIPE.  */
+		if (unlikely(!PIPE_READERS(*inode))) {
+			if (!written)
+				written = -EPIPE;
+			break;
 		}
-	}
-
-	/* Copy into available space.  */
-	ret = -EFAULT;
-	while (count > 0) {
-		int space;
-		char *pipebuf = PIPE_BASE(*inode) + PIPE_END(*inode);
-		ssize_t chars = PIPE_MAX_WCHUNK(*inode);
-
-		if ((space = PIPE_FREE(*inode)) != 0) {
+		if (PIPE_PIOLEN(*inode))
+			goto skip_int_buf;
+		/* write to internal buffer - could be cyclic */
+		while(start = PIPE_LEN(*inode),chars = PIPE_SIZE - start, chars >= min) {
+			start += PIPE_START(*inode);
+			start %= PIPE_SIZE;
+			if (chars > PIPE_BUF - start)
+				chars = PIPE_BUF - start;
 			if (chars > count)
 				chars = count;
-			if (chars > space)
-				chars = space;
-
-			if (copy_from_user(pipebuf, buf, chars))
+			if (unlikely(copy_from_user(PIPE_BASE(*inode)+start,
+						buf, chars))) {
+				if (!written)
+					written = -EFAULT;
 				goto out;
-
-			written += chars;
+			}
+			do_wakeup = 1;
 			PIPE_LEN(*inode) += chars;
 			count -= chars;
+			written += chars;
+			if (!count)
+				goto out;
 			buf += chars;
-			space = PIPE_FREE(*inode);
-			continue;
+			min = 1;
 		}
-
-		ret = written;
-		if (filp->f_flags & O_NONBLOCK)
+skip_int_buf:
+		if (!filp->f_flags & O_NONBLOCK) {
+			if (!written)
+				written = -EAGAIN;
 			break;
+		}
 
-		do {
-			/*
-			 * Synchronous wake-up: it knows that this process
-			 * is going to give up this CPU, so it doesnt have
-			 * to do idle reschedules.
+		if (unlikely(signal_pending(current))) {
+			if (!written)
+				written = -ERESTARTSYS;
+			break;
+		}
+		{
+			struct pipe_pio my_pio;
+			/* build_pio
+			 * wakeup readers:
+			 * If the pipe was empty and now contains data, then do
+			 * a wakeup. We will sleep --> sync wakeup.
 			 */
-			wake_up_interruptible_sync(PIPE_WAIT(*inode));
-			PIPE_WAITING_WRITERS(*inode)++;
+			build_pio(&my_pio, inode, buf, count);
+			if (do_wakeup || PIPE_PIO(*inode).next == &my_pio.list)
+				wake_up_sync(PIPE_WAIT(*inode));
+			do_wakeup = 0;
 			pipe_wait(inode);
-			PIPE_WAITING_WRITERS(*inode)--;
-			if (signal_pending(current))
-				goto out;
-			if (!PIPE_READERS(*inode))
-				goto sigpipe;
-		} while (!PIPE_FREE(*inode));
-		ret = -EFAULT;
+			chars = teardown_pio(&my_pio, inode, buf);
+			count -= chars;
+			written += chars;
+			if (!count)
+				break;
+			buf += chars;
+		}
 	}
-
-	/* Signal readers asynchronously that there is more data.  */
-	wake_up_interruptible(PIPE_WAIT(*inode));
-
-	inode->i_ctime = inode->i_mtime = CURRENT_TIME;
-	mark_inode_dirty(inode);
-
 out:
+	if (written > 0) {
+		/* SuS V2: st_ctime and st_mtime are updated
+		 * uppon successful completion of write(2).
+		 */
+		inode->i_ctime = inode->i_mtime = CURRENT_TIME;
+		mark_inode_dirty(inode);
+	}
 	up(PIPE_SEM(*inode));
-out_nolock:
-	if (written)
-		ret = written;
-	return ret;
 
-sigpipe:
-	if (written)
-		goto out;
-	up(PIPE_SEM(*inode));
-	send_sig(SIGPIPE, current, 0);
-	return -EPIPE;
+	if (do_wakeup)
+		wake_up(PIPE_WAIT(*inode));
+	if (written == -EPIPE)
+		send_sig(SIGPIPE, current, 0);
+	return written;
 }
 
 static loff_t
@@ -270,7 +421,8 @@
 {
 	switch (cmd) {
 		case FIONREAD:
-			return put_user(PIPE_LEN(*pino), (int *)arg);
+			return put_user(PIPE_LEN(*filp->f_dentry->d_inode) +
+					PIPE_PIOLEN(*filp->f_dentry->d_inode), (int *)arg);
 		default:
 			return -EINVAL;
 	}
@@ -286,11 +438,20 @@
 	poll_wait(filp, PIPE_WAIT(*inode), wait);
 
 	/* Reading only -- no need for acquiring the semaphore.  */
+
+	/* 
+	 * POLLIN means that data is available for read.
+	 * POLLOUT means that a nonblocking write will succeed.
+	 * We can only guarantee that if the internal buffers are empty
+	 * Therefore both are mutually exclusive.
+	 */
 	mask = POLLIN | POLLRDNORM;
-	if (PIPE_EMPTY(*inode))
+	if (!PIPE_LEN(*inode) && !PIPE_PIOLEN(*inode))
 		mask = POLLOUT | POLLWRNORM;
+	/* POLLHUP: no writer, and there was at least once a writer */
 	if (!PIPE_WRITERS(*inode) && filp->f_version != PIPE_WCOUNTER(*inode))
 		mask |= POLLHUP;
+	/* POLLERR: no reader */
 	if (!PIPE_READERS(*inode))
 		mask |= POLLERR;
 
@@ -454,9 +615,9 @@
 
 	init_waitqueue_head(PIPE_WAIT(*inode));
 	PIPE_BASE(*inode) = (char*) page;
-	PIPE_START(*inode) = PIPE_LEN(*inode) = 0;
+	INIT_LIST_HEAD(&PIPE_PIO(*inode));
+	PIPE_START(*inode) = PIPE_LEN(*inode) = PIPE_PIOLEN(*inode) = 0;
 	PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 0;
-	PIPE_WAITING_READERS(*inode) = PIPE_WAITING_WRITERS(*inode) = 0;
 	PIPE_RCOUNTER(*inode) = PIPE_WCOUNTER(*inode) = 1;
 
 	return inode;
--- 2.5/include/linux/pipe_fs_i.h	Sat Apr 28 10:37:27 2001
+++ build-2.5/include/linux/pipe_fs_i.h	Wed Jan  2 00:56:14 2002
@@ -5,12 +5,12 @@
 struct pipe_inode_info {
 	wait_queue_head_t wait;
 	char *base;
-	unsigned int len;
+	size_t len;	/* not including pio buffers */
+	size_t piolen;
 	unsigned int start;
+	struct list_head pio;
 	unsigned int readers;
 	unsigned int writers;
-	unsigned int waiting_readers;
-	unsigned int waiting_writers;
 	unsigned int r_counter;
 	unsigned int w_counter;
 };
@@ -24,19 +24,15 @@
 #define PIPE_BASE(inode)	((inode).i_pipe->base)
 #define PIPE_START(inode)	((inode).i_pipe->start)
 #define PIPE_LEN(inode)		((inode).i_pipe->len)
+#define PIPE_PIOLEN(inode)	((inode).i_pipe->piolen)
+#define PIPE_PIO(inode)		((inode).i_pipe->pio)
 #define PIPE_READERS(inode)	((inode).i_pipe->readers)
 #define PIPE_WRITERS(inode)	((inode).i_pipe->writers)
-#define PIPE_WAITING_READERS(inode)	((inode).i_pipe->waiting_readers)
-#define PIPE_WAITING_WRITERS(inode)	((inode).i_pipe->waiting_writers)
 #define PIPE_RCOUNTER(inode)	((inode).i_pipe->r_counter)
 #define PIPE_WCOUNTER(inode)	((inode).i_pipe->w_counter)
 
-#define PIPE_EMPTY(inode)	(PIPE_LEN(inode) == 0)
-#define PIPE_FULL(inode)	(PIPE_LEN(inode) == PIPE_SIZE)
 #define PIPE_FREE(inode)	(PIPE_SIZE - PIPE_LEN(inode))
 #define PIPE_END(inode)	((PIPE_START(inode) + PIPE_LEN(inode)) & (PIPE_SIZE-1))
-#define PIPE_MAX_RCHUNK(inode)	(PIPE_SIZE - PIPE_START(inode))
-#define PIPE_MAX_WCHUNK(inode)	(PIPE_SIZE - PIPE_END(inode))
 
 /* Drop the inode semaphore and wait for a pipe event, atomically */
 void pipe_wait(struct inode * inode);
--- 2.5/fs/fifo.c	Fri Feb 23 15:25:22 2001
+++ build-2.5/fs/fifo.c	Wed Jan  2 00:56:14 2002
@@ -32,10 +32,8 @@
 {
 	int ret;
 
-	ret = -ERESTARTSYS;
-	lock_kernel();
 	if (down_interruptible(PIPE_SEM(*inode)))
-		goto err_nolock_nocleanup;
+		return -ERESTARTSYS;
 
 	if (!inode->i_pipe) {
 		ret = -ENOMEM;
@@ -116,7 +114,6 @@
 
 	/* Ok! */
 	up(PIPE_SEM(*inode));
-	unlock_kernel();
 	return 0;
 
 err_rd:
@@ -141,9 +138,6 @@
 
 err_nocleanup:
 	up(PIPE_SEM(*inode));
-
-err_nolock_nocleanup:
-	unlock_kernel();
 	return ret;
 }