[LWN Logo]
[LWN.net]
From:	 Hubertus Franke <frankeh@watson.ibm.com>
To:	 Rusty Russell <rusty@rustcorp.com.au>
Subject: Re: [PATCH] Futex Generalization Patch
Date:	 Wed, 10 Apr 2002 10:24:42 -0400
Cc:	 linux-kernel@vger.kernel.org, Martin.Wirth@dlr.de,
	 pwaechler@loewe-Komp.de, drepper@redhat.com

On Saturday 06 April 2002 04:48 am, Rusty Russell wrote:
> In message <20020404162751.B0A253FE06@smtp.linux.ibm.com> you write:

Enclosed is an "asynchronous" extension to futexes.
This mechanism is required to implement global locks in the presence of an 
M:N threading package. 
Here "N" denotes a set of kernel threads, that serve as virtual processors on 
which the M user threads can be dispatched by the user level thread 
manager/scheduler.

Under no circumstances can one block the kernel threads through the futex 
interface. Instead, I want to indicate to the futex-subsystem, that I want a 
notification for a particular futex, when its woken up. 
Working with some M:N threading folks we came up with the following design 
implemented in the attached patch to Rusty's previously posted futex 
generalization patch. I can also provide a integrated patch over 2.5.7 
vanilla.

The futex_q structure is extended to hold a <tgid>, the <uaddr> of the futex 
and a signal with which to notify a caller.
We now provide a FUTEX_AWAIT call which attaches a futex_q object to the wait 
queue, similar the FUTEX_WAIT. Differences are as follows.
(a) futex_q must be dynamically allocated in the AWAIT case, instead of on 
the stack. We use the tgid to identify <async> vs. <sync> == 0.
(b) there will be no blocking in the kernel and hence we don't release the 
page upon return from this AWAIT call. 
(c) Upon wakeup, if the futex_q is an async wait (tgid != 0) we move it to a  
global notification queue and signal the <tgid> with the specified signal.
We release the page (put_page) at this point.
If the <tgid> doesn't exist (e.g. died), we delete futex_q and signal the 
next.
(d) A FUTEX_AWAKERS call is provided to the application to retrieve waiting 
notifications, typically called within the signal handler.

We need a cleanup mechanism for the notification queue, when a process dies 
prematurely before it had a chance to get their items out of the queue.

For this I introduced _exit_futex(task) to be called from do_exit().
It simply wakes the notifcation queue and deletes everything that is 
associated with its pid. Right now that is done for each exiting task.
In general one might want to think about a flag indicating whether this 
callback is necessary. 


For easier reading I also attach the futex.c program.

Comments.


-- 
-- Hubertus Franke  (frankeh@watson.ibm.com)
diff -urbN linux-2.5.7-futex/include/linux/futex.h linux-2.5.7-afutex/include/linux/futex.h
--- linux-2.5.7-futex/include/linux/futex.h	Tue Apr  9 16:19:59 2002
+++ linux-2.5.7-afutex/include/linux/futex.h	Tue Apr  9 12:28:36 2002
@@ -4,5 +4,7 @@
 /* Second argument to futex syscall */
 #define FUTEX_WAIT (0)
 #define FUTEX_WAKE (1)
+#define FUTEX_AWAIT   (2)
+#define FUTEX_AWAKERS (3)
 
 #endif
diff -urbN linux-2.5.7-futex/kernel/exit.c linux-2.5.7-afutex/kernel/exit.c
--- linux-2.5.7-futex/kernel/exit.c	Mon Mar 18 15:37:10 2002
+++ linux-2.5.7-afutex/kernel/exit.c	Tue Apr  9 14:07:45 2002
@@ -496,6 +496,10 @@
 #ifdef CONFIG_BSD_PROCESS_ACCT
 	acct_process(code);
 #endif
+	{
+	extern void __exit_futex(struct task_struct*);
+ 	__exit_futex(tsk);	
+	}
 	__exit_mm(tsk);
 
 	lock_kernel();
diff -urbN linux-2.5.7-futex/kernel/futex.c linux-2.5.7-afutex/kernel/futex.c
--- linux-2.5.7-futex/kernel/futex.c	Tue Apr  9 16:19:59 2002
+++ linux-2.5.7-afutex/kernel/futex.c	Wed Apr 10 09:54:10 2002
@@ -33,6 +33,7 @@
 #include <linux/futex.h>
 #include <linux/highmem.h>
 #include <linux/time.h>
+#include <linux/slab.h>        /* for kmalloc() */     
 #include <asm/uaccess.h>
 
 /* These mutexes are a very simple counter: the winner is the one who
@@ -49,15 +50,19 @@
    the relevent ones (hashed queues may be shared) */
 struct futex_q {
 	struct list_head list;
-	struct task_struct *task;
 	/* Page struct and offset within it. */
 	struct page *page;
 	unsigned int offset;
+	struct task_struct *task;
+	pid_t tgid;  /* "0" if synchronous futex */
+	void *uaddr; /* async wait on            */
+	int   sig;   /* signal to wakeup         */
 };
 
 /* The key for the hash is the address + index + offset within page */
 static struct list_head futex_queues[1<<FUTEX_HASHBITS];
 static spinlock_t futex_lock = SPIN_LOCK_UNLOCKED;
+static LIST_HEAD(notify_queue);
 
 static inline struct list_head *hash_futex(struct page *page,
 					   unsigned long offset)
@@ -69,50 +74,62 @@
 	return &futex_queues[hash_long(h, FUTEX_HASHBITS)];
 }
 
-static int futex_wake(struct list_head *head,
+
+/* Add at end to avoid starvation */
+static inline void queue_me(struct list_head *head,
+			    struct futex_q *q,
 		      struct page *page,
-		      unsigned int offset,
-		      int num)
+			    unsigned int offset)
 {
-	struct list_head *i, *next;
-	int num_woken = 0;
+	q->tgid   = 0;
+	q->task   = current;
+	q->page   = page;
+	q->offset = offset;
 
 	spin_lock(&futex_lock);
-	list_for_each_safe(i, next, head) {
-		struct futex_q *this = list_entry(i, struct futex_q, list);
+	list_add_tail(&q->list, head);
+	spin_unlock(&futex_lock);
+}
 
-		if (this->page == page && this->offset == offset) {
-			list_del_init(i);
-			wake_up_process(this->task);
-			num_woken++;
-			if (num_woken >= num) break;
-		}
+/* Return 1 if we were still queued (ie. 0 means we were woken) */
+static inline int unqueue_me(struct futex_q *q)
+{
+	int ret = 0;
+	spin_lock(&futex_lock);
+	if (!list_empty(&q->list)) {
+		list_del(&q->list);
+		ret = 1;
 	}
 	spin_unlock(&futex_lock);
-	return num_woken;
+	return ret;
 }
 
 /* Add at end to avoid starvation */
-static inline void queue_me(struct list_head *head,
+static inline void queue_me_async(struct list_head *head,
 			    struct futex_q *q,
 			    struct page *page,
-			    unsigned int offset)
+				  unsigned int offset,
+				  void *uaddr,
+				  int   sig)
 {
+	q->tgid   = current->tgid;
 	q->task = current;
 	q->page = page;
 	q->offset = offset;
+	q->uaddr  = uaddr;
+	q->sig    = sig;
 
 	spin_lock(&futex_lock);
 	list_add_tail(&q->list, head);
 	spin_unlock(&futex_lock);
 }
 
-/* Return 1 if we were still queued (ie. 0 means we were woken) */
-static inline int unqueue_me(struct futex_q *q)
+/* Return 1 if we were still queued in wait queue and not in notify queue */
+static inline int unqueue_me_async(struct futex_q *q)
 {
 	int ret = 0;
 	spin_lock(&futex_lock);
-	if (!list_empty(&q->list)) {
+	if ((q->page != NULL) && (!list_empty(&q->list))) {
 		list_del(&q->list);
 		ret = 1;
 	}
@@ -120,6 +137,47 @@
 	return ret;
 }
 
+static int futex_wake(struct list_head *head,
+		      struct page *page,
+		      unsigned int offset,
+		      int num)
+{
+
+	extern int sys_kill(int,int);
+
+	struct list_head *i, *next;
+	int num_woken = 0;
+
+	spin_lock(&futex_lock);
+	list_for_each_safe(i, next, head) {
+		struct futex_q *this = list_entry(i, struct futex_q, list);
+
+		if (this->page == page && this->offset == offset) {
+			list_del_init(i);
+			if (this->tgid == 0) {
+				/* synchronous */
+				wake_up_process(this->task);
+			} else {
+				/* move to notification queue, release page */
+				list_add_tail(&this->list, &notify_queue);
+				put_page(this->page);
+				this->page = NULL;
+
+				if (sys_kill(this->tgid,this->sig)) {
+					/* target is dead */
+					list_del(&this->list);
+					kfree(this);
+					continue;
+				}
+			}
+			num_woken++;
+			if (num_woken >= num) break;
+		}
+	}
+	spin_unlock(&futex_lock);
+	return num_woken;
+}
+
 /* Get kernel address of the user page and pin it. */
 static struct page *pin_page(unsigned long page_start)
 {
@@ -158,8 +216,10 @@
 	if (*count != val) {
 		ret = -EWOULDBLOCK;
 		set_current_state(TASK_RUNNING);
+		kunmap(page);
 		goto out;
 	}
+	kunmap(page);
 	time = schedule_timeout(time);
 	if (time == 0) {
 		ret = -ETIMEDOUT;
@@ -170,21 +230,100 @@
 		goto out;
 	}
  out:
-	kunmap(page);
 	/* Were we woken up anyway? */
 	if (!unqueue_me(&q))
 		return 0;
 	return ret;
 }
 
+static int futex_await(struct list_head *head,
+		       struct page *page,
+		       int offset,
+		       int val,
+		       void *uaddr,
+		       int sig)
+{
+	int *count;
+	struct futex_q *q = kmalloc(sizeof(struct futex_q),GFP_KERNEL);
+	int ret = 0;
+
+	set_current_state(TASK_INTERRUPTIBLE);
+	queue_me_async(head, q, page, offset, uaddr, sig);
+
+	count = kmap(page) + offset;
+	if (*count != val) {
+		set_current_state(TASK_RUNNING);
+		if (unqueue_me_async(q)) {
+			kfree(q);
+			ret = -EAGAIN;
+		}
+	}
+	kunmap(page);
+	return(ret);
+}
+
+static int futex_awaiters(void **uaddr, int num)
+{
+	struct list_head *i, *next;
+	int num_woken = 0;
+	int rc;
+
+	spin_lock(&futex_lock);
+	list_for_each_safe(i, next, &notify_queue) {
+		struct futex_q *this = list_entry(i, struct futex_q, list);
+
+		if (this->tgid == current->tgid) {
+			if (num_woken >= num) 
+				goto out;
+
+			if ((rc = put_user(this->uaddr,&uaddr[num_woken]))) {
+				/* all notifications selected sofar will be lost */
+				/* we could recreate them from **uaddr           */
+				num_woken = rc;
+				break;
+			}
+			list_del(i);
+			kfree(this);
+			num_woken++;
+		}
+	}
+ out:
+	spin_unlock(&futex_lock);
+	return num_woken;
+}
+
+/* cleanup called from do_exit */
+void __exit_futex(struct task_struct *task)
+{
+	struct list_head *i, *next;
+
+	spin_lock(&futex_lock);
+	list_for_each_safe(i, next, &notify_queue) {
+		struct futex_q *this = list_entry(i, struct futex_q, list);
+
+		if (this->tgid == task->pid) {
+			list_del(i);
+			kfree(this);
+		}
+	}
+	spin_unlock(&futex_lock);
+}
+
 asmlinkage int sys_futex(void *uaddr, int op, int val, struct timespec *utime)
 {
 	int ret;
 	unsigned long pos_in_page;
 	struct list_head *head;
 	struct page *page;
-	unsigned long time = MAX_SCHEDULE_TIMEOUT;
+	unsigned long time;
 
+	/* first handle the special case commands */
+	if (op == FUTEX_AWAKERS) {
+		return futex_awaiters((void**)uaddr, val);
+	}
+
+
+	time = MAX_SCHEDULE_TIMEOUT;
 	if (utime) {
 		struct timespec t;
 		if (copy_from_user(&t, utime, sizeof(t)) != 0)
@@ -212,11 +351,17 @@
 	case FUTEX_WAKE:
 		ret = futex_wake(head, page, pos_in_page, val);
 		break;
+	case FUTEX_AWAIT:
+		ret = futex_await(head, page, pos_in_page, val, uaddr, (int)utime);
+		if (!ret) 
+			goto out ;  /* don't release the page */
+		break;
 	default:
 		ret = -EINVAL;
 	}
 	put_page(page);
 
+out:
 	return ret;
 }
 
/*
 *  Fast Userspace Mutexes (which I call "Futexes!").
 *  (C) Rusty Russell, IBM 2002
 *
 *  Thanks to Ben LaHaise for yelling "hashed waitqueues" loudly
 *  enough at me, Linus for the original (flawed) idea, Matthew
 *  Kirkwood for proof-of-concept implementation.
 *
 *  "The futexes are also cursed."
 *  "But they come in a choice of three flavours!"
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 2 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 */
#include <linux/kernel.h>
#include <linux/spinlock.h>
#include <linux/sched.h>
#include <linux/mm.h>
#include <linux/hash.h>
#include <linux/init.h>
#include <linux/fs.h>
#include <linux/futex.h>
#include <linux/highmem.h>
#include <linux/time.h>
#include <linux/slab.h>        /* for kmalloc() */     
#include <asm/uaccess.h>

/* These mutexes are a very simple counter: the winner is the one who
   decrements from 1 to 0.  The counter starts at 1 when the lock is
   free.  A value other than 0 or 1 means someone may be sleeping.
   This is simple enough to work on all architectures, but has the
   problem that if we never "up" the semaphore it could eventually
   wrap around. */

/* FIXME: This may be way too small. --RR */
#define FUTEX_HASHBITS 6

/* We use this instead of a normal wait_queue_t, so we can wake only
   the relevent ones (hashed queues may be shared) */
struct futex_q {
	struct list_head list;
	/* Page struct and offset within it. */
	struct page *page;
	unsigned int offset;
	struct task_struct *task;
	pid_t tgid;  /* "0" if synchronous futex */
	void *uaddr; /* async wait on            */
	int   sig;   /* signal to wakeup         */
};

/* The key for the hash is the address + index + offset within page */
static struct list_head futex_queues[1<<FUTEX_HASHBITS];
static spinlock_t futex_lock = SPIN_LOCK_UNLOCKED;
static LIST_HEAD(notify_queue);

static inline struct list_head *hash_futex(struct page *page,
					   unsigned long offset)
{
	unsigned long h;

	/* struct page is shared, so we can hash on its address */
	h = (unsigned long)page + offset;
	return &futex_queues[hash_long(h, FUTEX_HASHBITS)];
}


/* Add at end to avoid starvation */
static inline void queue_me(struct list_head *head,
			    struct futex_q *q,
			    struct page *page,
			    unsigned int offset)
{
	q->tgid   = 0;
	q->task   = current;
	q->page   = page;
	q->offset = offset;

	spin_lock(&futex_lock);
	list_add_tail(&q->list, head);
	spin_unlock(&futex_lock);
}

/* Return 1 if we were still queued (ie. 0 means we were woken) */
static inline int unqueue_me(struct futex_q *q)
{
	int ret = 0;
	spin_lock(&futex_lock);
	if (!list_empty(&q->list)) {
		list_del(&q->list);
		ret = 1;
	}
	spin_unlock(&futex_lock);
	return ret;
}

/* Add at end to avoid starvation */
static inline void queue_me_async(struct list_head *head,
				  struct futex_q *q,
				  struct page *page,
				  unsigned int offset,
				  void *uaddr,
				  int   sig)
{
	q->tgid   = current->tgid;
	q->task   = current;
	q->page   = page;
	q->offset = offset;
	q->uaddr  = uaddr;
	q->sig    = sig;

	spin_lock(&futex_lock);
	list_add_tail(&q->list, head);
	spin_unlock(&futex_lock);
}

/* Return 1 if we were still queued in wait queue and not in notify queue */
static inline int unqueue_me_async(struct futex_q *q)
{
	int ret = 0;
	spin_lock(&futex_lock);
	if ((q->page != NULL) && (!list_empty(&q->list))) {
		list_del(&q->list);
		ret = 1;
	}
	spin_unlock(&futex_lock);
	return ret;
}

static int futex_wake(struct list_head *head,
		      struct page *page,
		      unsigned int offset,
		      int num)
{

	extern int sys_kill(int,int);

	struct list_head *i, *next;
	int num_woken = 0;

	spin_lock(&futex_lock);
	list_for_each_safe(i, next, head) {
		struct futex_q *this = list_entry(i, struct futex_q, list);

		if (this->page == page && this->offset == offset) {
			list_del_init(i);
			if (this->tgid == 0) {
				/* synchronous */
				wake_up_process(this->task);
			} else {
				/* move to notification queue, release page */
				list_add_tail(&this->list, &notify_queue);
				put_page(this->page);
				this->page = NULL;

				if (sys_kill(this->tgid,this->sig)) {
					/* target is dead */
					list_del(&this->list);
					kfree(this);
					continue;
				}
			}
			num_woken++;
			if (num_woken >= num) break;
		}
	}
	spin_unlock(&futex_lock);
	return num_woken;
}

/* Get kernel address of the user page and pin it. */
static struct page *pin_page(unsigned long page_start)
{
	struct mm_struct *mm = current->mm;
	struct page *page;
	int err;

	down_read(&mm->mmap_sem);
	err = get_user_pages(current, current->mm, page_start,
			     1 /* one page */,
			     1 /* writable */,
			     0 /* don't force */,
			     &page,
			     NULL /* don't return vmas */);
	up_read(&mm->mmap_sem);

	if (err < 0)
		return ERR_PTR(err);
	return page;
}

static int futex_wait(struct list_head *head,
		      struct page *page,
		      int offset,
		      int val,
		      unsigned long time)
{
	int *count;
	struct futex_q q;
	int ret = 0;

	set_current_state(TASK_INTERRUPTIBLE);
	queue_me(head, &q, page, offset);

	count = kmap(page) + offset;
	if (*count != val) {
		ret = -EWOULDBLOCK;
		set_current_state(TASK_RUNNING);
		kunmap(page);
		goto out;
	}
	kunmap(page);
	time = schedule_timeout(time);
	if (time == 0) {
		ret = -ETIMEDOUT;
		goto out;
	}
	if (signal_pending(current)) {
		ret = -EINTR;
		goto out;
	}
 out:
	/* Were we woken up anyway? */
	if (!unqueue_me(&q))
		return 0;
	return ret;
}

static int futex_await(struct list_head *head,
		       struct page *page,
		       int offset,
		       int val,
		       void *uaddr,
		       int sig)
{
	int *count;
	struct futex_q *q = kmalloc(sizeof(struct futex_q),GFP_KERNEL);
	int ret = 0;

	set_current_state(TASK_INTERRUPTIBLE);
	queue_me_async(head, q, page, offset, uaddr, sig);

	count = kmap(page) + offset;
	if (*count != val) {
		set_current_state(TASK_RUNNING);
		if (unqueue_me_async(q)) {
			kfree(q);
			ret = -EAGAIN;
		}
	}
	kunmap(page);
	return(ret);
}

static int futex_awaiters(void **uaddr, int num)
{
	struct list_head *i, *next;
	int num_woken = 0;
	int rc;

	spin_lock(&futex_lock);
	list_for_each_safe(i, next, &notify_queue) {
		struct futex_q *this = list_entry(i, struct futex_q, list);

		if (this->tgid == current->tgid) {
			if (num_woken >= num) 
				goto out;

			if ((rc = put_user(this->uaddr,&uaddr[num_woken]))) {
				/* all notifications selected sofar will be lost */
				/* we could recreate them from **uaddr           */
				num_woken = rc;
				break;
			}
			list_del(i);
			kfree(this);
			num_woken++;
		}
	}
 out:
	spin_unlock(&futex_lock);
	return num_woken;
}

/* cleanup called from do_exit */
void __exit_futex(struct task_struct *task)
{
	struct list_head *i, *next;

	spin_lock(&futex_lock);
	list_for_each_safe(i, next, &notify_queue) {
		struct futex_q *this = list_entry(i, struct futex_q, list);

		if (this->tgid == task->tgid) {
			list_del(i);
			kfree(this);
		}
	}
	spin_unlock(&futex_lock);
}

asmlinkage int sys_futex(void *uaddr, int op, int val, struct timespec *utime)
{
	int ret;
	unsigned long pos_in_page;
	struct list_head *head;
	struct page *page;
	unsigned long time;

	/* first handle the special case commands */
	if (op == FUTEX_AWAKERS) {
		return futex_awaiters((void**)uaddr, val);
	}


	time = MAX_SCHEDULE_TIMEOUT;
	if (utime) {
		struct timespec t;
		if (copy_from_user(&t, utime, sizeof(t)) != 0)
			return -EFAULT;
		time = timespec_to_jiffies(&t) + 1;
	}

	pos_in_page = ((unsigned long)uaddr) % PAGE_SIZE;

	/* Must be "naturally" aligned, and not on page boundary. */
	if ((pos_in_page % __alignof__(int)) != 0
	    || pos_in_page + sizeof(int) > PAGE_SIZE)
		return -EINVAL;

	/* Simpler if it doesn't vanish underneath us. */
	page = pin_page((unsigned long)uaddr - pos_in_page);
	if (IS_ERR(page))
		return PTR_ERR(page);

	head = hash_futex(page, pos_in_page);
	switch (op) {
	case FUTEX_WAIT:
		ret = futex_wait(head, page, pos_in_page, val, time);
		break;
	case FUTEX_WAKE:
		ret = futex_wake(head, page, pos_in_page, val);
		break;
	case FUTEX_AWAIT:
		ret = futex_await(head, page, pos_in_page, val, uaddr, (int)utime);
		if (!ret) 
			goto out ;  /* don't release the page */
		break;
	default:
		ret = -EINVAL;
	}
	put_page(page);

out:
	return ret;
}

static int __init init(void)
{
	unsigned int i;

	for (i = 0; i < ARRAY_SIZE(futex_queues); i++)
		INIT_LIST_HEAD(&futex_queues[i]);
	return 0;
}
__initcall(init);