patch-2.2.18 linux/net/sunrpc/sched.c

Next file: linux/net/sunrpc/stats.c
Previous file: linux/net/sunrpc/pmap_clnt.c
Back to the patch index
Back to the overall index

diff -u --new-file --recursive --exclude-from /usr/src/exclude v2.2.17/net/sunrpc/sched.c linux/net/sunrpc/sched.c
@@ -31,7 +31,7 @@
  */
 #define GFP_RPC			GFP_NFS
 
-static void			__rpc_default_timer(struct rpc_task *task);
+static void			rpc_default_timer(struct rpc_task *task);
 static void			rpciod_killall(void);
 
 /*
@@ -68,12 +68,110 @@
 static int			rpc_inhibit = 0;
 
 /*
+ * Spinlock for wait queues. Access to the latter has to be interrupt-safe
+ * since we want to wake up tasks from sk->write_space().
+ */
+spinlock_t rpc_queue_lock = SPIN_LOCK_UNLOCKED;
+
+/*
  * This is the last-ditch buffer for NFS swap requests
  */
 static u32			swap_buffer[PAGE_SIZE >> 2];
 static int			swap_buffer_used = 0;
 
 /*
+ * Make allocation of the swap_buffer SMP-safe
+ */
+static __inline__ int rpc_lock_swapbuf(void)
+{
+	return !test_and_set_bit(1, &swap_buffer_used);
+}
+static __inline__ void rpc_unlock_swapbuf(void)
+{
+	clear_bit(1, &swap_buffer_used);
+}
+
+/*
+ * Disable the timer for a given RPC task. Should be called with
+ * rpc_queue_lock and bh_disabled in order to avoid races within
+ * rpc_run_timer().
+ */
+static inline void
+__rpc_disable_timer(struct rpc_task *task)
+{
+	dprintk("RPC: %4d disabling timer\n", task->tk_pid);
+	task->tk_timeout_fn = NULL;
+	task->tk_timeout = 0;
+}
+
+/*
+ * Run a timeout function.
+ * We use the callback in order to allow __rpc_wake_up_task()
+ * and friends to disable the timer synchronously on SMP systems
+ * without calling del_timer_sync(). The latter could cause a
+ * deadlock if called while we're holding spinlocks...
+ */
+static void
+rpc_run_timer(struct rpc_task *task)
+{
+	void (*callback)(struct rpc_task *);
+	unsigned long	oldflags;
+
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
+	callback = task->tk_timeout_fn;
+	task->tk_timeout_fn = NULL;
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
+	if (callback) {
+		dprintk("RPC: %4d running timer\n", task->tk_pid);
+		callback(task);
+	}
+}
+
+/*
+ * Set up a timer for the current task.
+ */
+static inline void
+__rpc_add_timer(struct rpc_task *task, rpc_action timer)
+{
+	if (!task->tk_timeout)
+		return;
+
+	dprintk("RPC: %4d setting alarm for %lu ms\n",
+			task->tk_pid, task->tk_timeout * 1000 / HZ);
+
+	if (timer)
+		task->tk_timeout_fn = timer;
+	else
+		task->tk_timeout_fn = rpc_default_timer;
+	mod_timer(&task->tk_timer, jiffies + task->tk_timeout);
+}
+
+/*
+ * Set up a timer for an already sleeping task.
+ */
+void rpc_add_timer(struct rpc_task *task, rpc_action timer)
+{
+	unsigned long	oldflags;
+
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
+	if (!(RPC_IS_RUNNING(task) || task->tk_wakeup))
+		__rpc_add_timer(task, timer);
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
+}
+
+/*
+ * Delete any timer for the current task.
+ */
+static inline void
+rpc_delete_timer(struct rpc_task *task)
+{
+	if (timer_pending(&task->tk_timer)) {
+		dprintk("RPC: %4d deleting timer\n", task->tk_pid);
+		del_timer(&task->tk_timer);
+	}
+}
+
+/*
  * Add new request to wait queue.
  *
  * Swapper tasks always get inserted at the head of the queue.
@@ -81,16 +179,17 @@
  * improve overall performance.
  * Everyone else gets appended to the queue to ensure proper FIFO behavior.
  */
-int
-rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
+static inline int
+__rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
 {
-	if (task->tk_rpcwait) {
-		if (task->tk_rpcwait != queue)
-		{
-			printk(KERN_WARNING "RPC: doubly enqueued task!\n");
-			return -EWOULDBLOCK;
-		}
+	if (task->tk_rpcwait == queue)
 		return 0;
+
+	if (task->tk_rpcwait) {
+		printk(KERN_WARNING "RPC: task already queued!\n");
+		dprintk("task already on %s, to be added to %s\n",
+			rpc_qname(task->tk_rpcwait), rpc_qname(queue));
+		return -EWOULDBLOCK;
 	}
 	if (RPC_IS_SWAPPER(task))
 		rpc_insert_list(&queue->task, task);
@@ -104,17 +203,30 @@
 	return 0;
 }
 
+int
+rpc_add_wait_queue(struct rpc_wait_queue *q, struct rpc_task *task)
+{
+	unsigned long	oldflags;
+	int		result;
+
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
+	result = __rpc_add_wait_queue(q, task);
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
+	return result;
+}
+
 /*
  * Remove request from queue.
- * Note: must be called with interrupts disabled.
+ * Note: must be called with spin lock held.
  */
-void
-rpc_remove_wait_queue(struct rpc_task *task)
+static inline void
+__rpc_remove_wait_queue(struct rpc_task *task)
 {
-	struct rpc_wait_queue *queue;
+	struct rpc_wait_queue *queue = task->tk_rpcwait;
 
-	if (!(queue = task->tk_rpcwait))
+	if (!queue)
 		return;
+
 	rpc_remove_list(&queue->task, task);
 	task->tk_rpcwait = NULL;
 
@@ -122,73 +234,63 @@
 				task->tk_pid, queue, rpc_qname(queue));
 }
 
-/*
- * Set up a timer for the current task.
- */
-inline void
-rpc_add_timer(struct rpc_task *task, rpc_action timer)
+void
+rpc_remove_wait_queue(struct rpc_task *task)
 {
-	unsigned long	expires = jiffies + task->tk_timeout;
-
-	dprintk("RPC: %4d setting alarm for %lu ms\n",
-			task->tk_pid, task->tk_timeout * 1000 / HZ);
-	if (!timer)
-		timer = __rpc_default_timer;
-	if (time_before(expires, jiffies)) {
-		printk(KERN_ERR "RPC: bad timeout value %ld - setting to 10 sec!\n",
-					task->tk_timeout);
-		expires = jiffies + 10 * HZ;
-	}
-	task->tk_timer.expires  = expires;
-	task->tk_timer.data     = (unsigned long) task;
-	task->tk_timer.function = (void (*)(unsigned long)) timer;
-	task->tk_timer.prev     = NULL;
-	task->tk_timer.next     = NULL;
-	add_timer(&task->tk_timer);
-}
+	unsigned long	oldflags;
 
-/*
- * Delete any timer for the current task.
- * Must be called with interrupts off.
- */
-inline void
-rpc_del_timer(struct rpc_task *task)
-{
-	if (task->tk_timeout) {
-		dprintk("RPC: %4d deleting timer\n", task->tk_pid);
-		del_timer(&task->tk_timer);
-		task->tk_timeout = 0;
-	}
+	if (!task->tk_rpcwait)
+		return;
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
+	__rpc_remove_wait_queue(task);
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
 }
 
 /*
  * Make an RPC task runnable.
  *
  * Note: If the task is ASYNC, this must be called with 
- * interrupts disabled to protect the wait queue operation.
+ * spin lock held in order to protect the wait queue operation.
  */
 static inline void
-rpc_make_runnable(struct rpc_task *task)
+__rpc_make_runnable(struct rpc_task *task)
 {
 	if (task->tk_timeout) {
 		printk(KERN_ERR "RPC: task w/ running timer in rpc_make_runnable!!\n");
 		return;
 	}
-	task->tk_flags |= RPC_TASK_RUNNING;
+	task->tk_running = 1;
 	if (RPC_IS_ASYNC(task)) {
-		int status;
-		status = rpc_add_wait_queue(&schedq, task);
-		if (status)
-		{
-			printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
-			task->tk_status = status;
+		if (RPC_IS_SLEEPING(task)) {
+			int status;
+			status = __rpc_add_wait_queue(&schedq, task);
+			if (status < 0) {
+				printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
+				task->tk_status = status;
+			} else
+				task->tk_sleeping = 0;
 		}
 		wake_up(&rpciod_idle);
 	} else {
+		task->tk_sleeping = 0;
 		wake_up(&task->tk_wait);
 	}
 }
 
+/*
+ * Place a newly initialized task on the schedq.
+ */
+static inline void
+__rpc_schedule_run(struct rpc_task *task)
+{
+	/* Don't run a child twice! */
+	if (RPC_IS_ACTIVATED(task))
+		return;
+	task->tk_active = 1;
+	task->tk_sleeping = 1;
+	__rpc_make_runnable(task);
+}
+
 
 /*
  *	For other people who may need to wake the I/O daemon
@@ -198,9 +300,7 @@
 void rpciod_wake_up(void)
 {
 	if(rpciod_pid==0)
-	{
 		printk(KERN_ERR "rpciod: wot no daemon?\n");
-	}
 	wake_up(&rpciod_idle);
 }
 
@@ -214,33 +314,32 @@
 __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
 			rpc_action action, rpc_action timer)
 {
-	unsigned long	oldflags;
 	int status;
 
 	dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task->tk_pid,
 				rpc_qname(q), jiffies);
 
-	/*
-	 * Protect the execution below.
-	 */
-	save_flags(oldflags); cli();
+	if (!RPC_IS_ASYNC(task) && !RPC_IS_ACTIVATED(task)) {
+		printk(KERN_ERR "RPC: Inactive synchronous task put to sleep!\n");
+		return;
+	}
+
+	/* Mark the task as being activated if so needed */
+	if (!RPC_IS_ACTIVATED(task)) {
+		task->tk_active = 1;
+		task->tk_sleeping = 1;
+	}
 
-	status = rpc_add_wait_queue(q, task);
-	if (status)
-	{
+	status = __rpc_add_wait_queue(q, task);
+	if (status) {
 		printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
 		task->tk_status = status;
-		task->tk_flags |= RPC_TASK_RUNNING;
-	}
-	else
-	{
+	} else {
+		task->tk_running = 0;
 		task->tk_callback = action;
-		if (task->tk_timeout)
-			rpc_add_timer(task, timer);
-		task->tk_flags &= ~RPC_TASK_RUNNING;
+		__rpc_add_timer(task, timer);
 	}
 
-	restore_flags(oldflags);
 	return;
 }
 
@@ -248,17 +347,39 @@
 rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
 				rpc_action action, rpc_action timer)
 {
+	unsigned long	oldflags;
+
+	/*
+	 * Protect the queue operations.
+	 */
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
 	__rpc_sleep_on(q, task, action, timer);
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
+}
+
+void
+rpc_sleep_locked(struct rpc_wait_queue *q, struct rpc_task *task,
+		 rpc_action action, rpc_action timer)
+{
+	unsigned long	oldflags;
+
+	/*
+	 * Protect the queue operations.
+	 */
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
+	__rpc_sleep_on(q, task, action, timer);
+	__rpc_lock_task(task);
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
 }
 
 /*
- * Wake up a single task -- must be invoked with bottom halves off.
+ * Wake up a single task -- must be invoked with spin lock held.
  *
  * It would probably suffice to cli/sti the del_timer and remove_wait_queue
  * operations individually.
  */
-static void
-__rpc_wake_up(struct rpc_task *task)
+static inline void
+__rpc_wake_up_task(struct rpc_task *task)
 {
 	dprintk("RPC: %4d __rpc_wake_up (now %ld inh %d)\n",
 					task->tk_pid, jiffies, rpc_inhibit);
@@ -267,16 +388,32 @@
 	if (task->tk_magic != 0xf00baa) {
 		printk(KERN_ERR "RPC: attempt to wake up non-existing task!\n");
 		rpc_debug = ~0;
+		rpc_show_tasks();
 		return;
 	}
 #endif
-	rpc_del_timer(task);
-	if (task->tk_rpcwait != &schedq)
-		rpc_remove_wait_queue(task);
-	if (!RPC_IS_RUNNING(task)) {
-		task->tk_flags |= RPC_TASK_CALLBACK;
-		rpc_make_runnable(task);
+	/* Has the task been executed yet? If not, we cannot wake it up! */
+	if (!RPC_IS_ACTIVATED(task)) {
+		printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
+		return;
 	}
+	if (RPC_IS_RUNNING(task))
+		return;
+
+	__rpc_disable_timer(task);
+	if (task->tk_rpcwait != &schedq)
+		__rpc_remove_wait_queue(task);
+
+	/* If the task has been locked, then set tk_wakeup so that
+	 * rpc_unlock_task() wakes us up... */
+	if (task->tk_lock) {
+		task->tk_wakeup = 1;
+		return;
+	} else
+		task->tk_wakeup = 0;
+
+	__rpc_make_runnable(task);
+
 	dprintk("RPC:      __rpc_wake_up done\n");
 }
 
@@ -284,12 +421,12 @@
  * Default timeout handler if none specified by user
  */
 static void
-__rpc_default_timer(struct rpc_task *task)
+rpc_default_timer(struct rpc_task *task)
 {
-	dprintk("RPC: %d timeout (default timer)\n", task->tk_pid);
+	dprintk("RPC: %4d timeout (default timer)\n", task->tk_pid);
 	task->tk_status = -ETIMEDOUT;
 	task->tk_timeout = 0;
-	__rpc_wake_up(task);
+	rpc_wake_up_task(task);
 }
 
 /*
@@ -300,26 +437,38 @@
 {
 	unsigned long	oldflags;
 
-	save_flags(oldflags); cli();
-	__rpc_wake_up(task);
-	restore_flags(oldflags);
+	if (RPC_IS_RUNNING(task))
+		return;
+
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
+	__rpc_wake_up_task(task);
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
 }
 
 /*
  * Wake up the next task on the wait queue.
  */
-struct rpc_task *
-rpc_wake_up_next(struct rpc_wait_queue *queue)
+static inline struct rpc_task *
+__rpc_wake_up_next(struct rpc_wait_queue *queue)
 {
-	unsigned long	oldflags;
 	struct rpc_task	*task;
 
 	dprintk("RPC:      wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue));
-	save_flags(oldflags); cli();
 	if ((task = queue->task) != 0)
-		__rpc_wake_up(task);
-	restore_flags(oldflags);
+		__rpc_wake_up_task(task);
+
+	return task;
+}
+
+struct rpc_task *
+rpc_wake_up_next(struct rpc_wait_queue *queue)
+{
+	struct rpc_task	*task;
+	unsigned long	oldflags;
 
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
+	task = __rpc_wake_up_next(queue);
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
 	return task;
 }
 
@@ -331,10 +480,10 @@
 {
 	unsigned long	oldflags;
 
-	save_flags(oldflags); cli();
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
 	while (queue->task)
-		__rpc_wake_up(queue->task);
-	restore_flags(oldflags);
+		__rpc_wake_up_task(queue->task);
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
 }
 
 /*
@@ -346,30 +495,63 @@
 	struct rpc_task	*task;
 	unsigned long	oldflags;
 
-	save_flags(oldflags); cli();
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
 	while ((task = queue->task) != NULL) {
 		task->tk_status = status;
-		__rpc_wake_up(task);
+		__rpc_wake_up_task(task);
 	}
-	restore_flags(oldflags);
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
+}
+
+/*
+ * Lock down a sleeping task to prevent it from waking up
+ * and disappearing from beneath us.
+ *
+ * This function should always be called with the
+ * rpc_queue_lock held.
+ */
+int
+__rpc_lock_task(struct rpc_task *task)
+{
+	if (!RPC_IS_RUNNING(task))
+		return ++task->tk_lock;
+	return 0;
+}
+
+static inline void
+__rpc_unlock_task(struct rpc_task *task)
+{
+	if (task->tk_lock && !--task->tk_lock && task->tk_wakeup)
+		__rpc_wake_up_task(task);
+}
+
+void
+rpc_unlock_task(struct rpc_task *task)
+{
+	unsigned long	oldflags;
+
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
+	__rpc_unlock_task(task);
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
 }
 
 /*
  * Run a task at a later time
  */
-static void	__rpc_atrun(struct rpc_task *);
+static void	rpc_atrun(struct rpc_task *);
 void
 rpc_delay(struct rpc_task *task, unsigned long delay)
 {
 	task->tk_timeout = delay;
-	rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun);
+	rpc_sleep_on(&delay_queue, task, NULL, rpc_atrun);
 }
 
 static void
-__rpc_atrun(struct rpc_task *task)
+rpc_atrun(struct rpc_task *task)
 {
 	task->tk_status = 0;
-	__rpc_wake_up(task);
+	task->tk_timeout = 0;
+	rpc_wake_up_task(task);
 }
 
 /*
@@ -389,15 +571,15 @@
 		return 0;
 	}
 
+restarted:
 	while (1) {
 		/*
 		 * Execute any pending callback.
 		 */
-		if (task->tk_flags & RPC_TASK_CALLBACK) {
+		if (RPC_DO_CALLBACK(task)) {
 			/* Define a callback save pointer */
 			void (*save_callback)(struct rpc_task *);
 	
-			task->tk_flags &= ~RPC_TASK_CALLBACK;
 			/* 
 			 * If a callback exists, save it, reset it,
 			 * call it.
@@ -405,11 +587,9 @@
 			 * another callback set within the callback handler
 			 * - Dave
 			 */
-			if (task->tk_callback) {
-				save_callback=task->tk_callback;
-				task->tk_callback=NULL;
-				save_callback(task);
-			}
+			save_callback=task->tk_callback;
+			task->tk_callback=NULL;
+			save_callback(task);
 		}
 
 		/*
@@ -418,6 +598,10 @@
 		 * by someone else.
 		 */
 		if (RPC_IS_RUNNING(task)) {
+			/*
+			 * Garbage collection of pending timers...
+			 */
+			rpc_delete_timer(task);
 			if (!task->tk_action)
 				break;
 			task->tk_action(task);
@@ -425,86 +609,98 @@
 
 		/*
 		 * Check whether task is sleeping.
-		 * Note that if the task may go to sleep in tk_action,
+		 * Note that if the task goes to sleep in tk_action,
 		 * and the RPC reply arrives before we get here, it will
 		 * have state RUNNING, but will still be on schedq.
+		 * 27/9/99: The above has been attempted fixed by
+		 *          introduction of task->tk_sleeping.
 		 */
-		save_flags(oldflags); cli();
-		if (RPC_IS_RUNNING(task)) {
-			if (task->tk_rpcwait == &schedq)
-				rpc_remove_wait_queue(task);
-		} else while (!RPC_IS_RUNNING(task)) {
+		spin_lock_irqsave(&rpc_queue_lock, oldflags);
+		if (!RPC_IS_RUNNING(task)) {
+			task->tk_sleeping = 1;
 			if (RPC_IS_ASYNC(task)) {
-				restore_flags(oldflags);
+				spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
 				return 0;
 			}
+		} else
+			task->tk_sleeping = 0;
+		spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
 
+		while (RPC_IS_SLEEPING(task)) {
 			/* sync task: sleep here */
 			dprintk("RPC: %4d sync task going to sleep\n",
 							task->tk_pid);
-			if (current->pid == rpciod_pid)
-				printk(KERN_ERR "RPC: rpciod waiting on sync task!\n");
+			if (current->pid == rpciod_pid) {
+				printk(KERN_ERR "RPC: rpciod waiting on sync task %4d!\n", task->tk_pid);
+				rpc_show_tasks();
+			}
 
-			__wait_event(task->tk_wait, RPC_IS_RUNNING(task));
+			__wait_event(task->tk_wait, !RPC_IS_SLEEPING(task));
+			dprintk("RPC: %4d sync task resuming\n", task->tk_pid);
 
 			/*
-			 * When the task received a signal, remove from
-			 * any queues etc, and make runnable again.
+			 * When a sync task receives a signal, it exits with
+			 * -ERESTARTSYS. In order to catch any callbacks that
+			 * clean up after sleeping on some queue, we don't
+			 * break the loop here, but go around once more.
 			 */
-			if (signalled()) { 
-				cli(); 
-				__rpc_wake_up(task);
+			if (task->tk_client->cl_intr && signalled()) {
+				dprintk("RPC: %4d got signal\n", task->tk_pid);
+				task->tk_flags |= RPC_TASK_KILLED;
+				rpc_exit(task, -ERESTARTSYS);
+				rpc_wake_up_task(task);
 			}
-
-			dprintk("RPC: %4d sync task resuming\n",
-							task->tk_pid);
-		}
-		restore_flags(oldflags);
-
-		/*
-		 * When a sync task receives a signal, it exits with
-		 * -ERESTARTSYS. In order to catch any callbacks that
-		 * clean up after sleeping on some queue, we don't
-		 * break the loop here, but go around once more.
-		 */
-		if (!RPC_IS_ASYNC(task) && signalled()) {
-			dprintk("RPC: %4d got signal\n", task->tk_pid);
-			rpc_exit(task, -ERESTARTSYS);
 		}
 	}
 
 	dprintk("RPC: %4d exit() = %d\n", task->tk_pid, task->tk_status);
 	if (task->tk_exit) {
-		status = task->tk_status;
 		task->tk_exit(task);
+		/* If tk_action is non-null, the user wants us to restart */
+		if (task->tk_action) {
+			if (!RPC_ASSASSINATED(task)) {
+					/* Release RPC slot and buffer memory */
+					if (task->tk_rqstp)
+						xprt_release(task);
+					if (task->tk_buffer) {
+						rpc_free(task->tk_buffer);
+						task->tk_buffer = NULL;
+					}
+					goto restarted;
+			}
+			printk(KERN_ERR "RPC: dead task tries to walk away.\n");
+		}
 	}
 
+	/* Save the task exit status */
+	status = task->tk_status;
+
+	/* Release all resources associated with the task */
+	rpc_release_task(task);
 	return status;
 }
 
 /*
  * User-visible entry point to the scheduler.
- * The recursion protection is for debugging. It should go away once
- * the code has stabilized.
+ *
+ * This may be called recursively if e.g. an async NFS task updates
+ * the attributes and finds that dirty pages must be flushed.
  */
-void
+int
 rpc_execute(struct rpc_task *task)
 {
-	static int	executing = 0;
-	int		incr = RPC_IS_ASYNC(task)? 1 : 0;
-
-	if (incr) {
-		if (rpc_inhibit) {
-			printk(KERN_INFO "RPC: execution inhibited!\n");
-			return;
-		}
-		if (executing)
-			printk(KERN_WARNING "RPC: %d tasks executed\n", executing);
+	if (rpc_inhibit) {
+		printk(KERN_INFO "RPC: execution inhibited!\n");
+		return -EIO;
+	}
+	task->tk_running = 1;
+	if (task->tk_active) {
+		printk(KERN_ERR "RPC: active task was run twice!\n");
+		return -EWOULDBLOCK;
 	}
+	task->tk_active = 1;
 	
-	executing += incr;
-	__rpc_execute(task);
-	executing -= incr;
+	return __rpc_execute(task);
 }
 
 /*
@@ -514,31 +710,36 @@
 __rpc_schedule(void)
 {
 	struct rpc_task	*task;
-	int		count = 0;
 	unsigned long	oldflags;
-	int need_resched = current->need_resched;
+	int		count = 0;
 
 	dprintk("RPC:      rpc_schedule enter\n");
-	save_flags(oldflags);
 	while (1) {
-		cli();
-		if (!(task = schedq.task))
+		/* Ensure equal rights for tcp tasks... */
+		rpciod_tcp_dispatcher();
+
+		spin_lock_irqsave(&rpc_queue_lock, oldflags);
+		if (!(task = schedq.task)) {
+			spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
 			break;
-		rpc_del_timer(task);
-		rpc_remove_wait_queue(task);
-		task->tk_flags |= RPC_TASK_RUNNING;
-		restore_flags(oldflags);
+		}
+		if (task->tk_lock) {
+			spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
+			printk(KERN_ERR "RPC: Locked task was scheduled !!!!\n");
+			rpc_debug = ~0;
+			rpc_show_tasks();
+			break;
+		}
+		__rpc_remove_wait_queue(task);
+		spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
 
 		__rpc_execute(task);
 
-		if (++count >= 200) {
+		if (++count >= 200 || current->need_resched) {
+			schedule();
 			count = 0;
-			need_resched = 1;
 		}
-		if (need_resched)
-			schedule();
 	}
-	restore_flags(oldflags);
 	dprintk("RPC:      rpc_schedule leave\n");
 }
 
@@ -579,7 +780,8 @@
 			dprintk("RPC:      allocated buffer %p\n", buffer);
 			return buffer;
 		}
-		if ((flags & RPC_TASK_SWAPPER) && !swap_buffer_used++) {
+		if ((flags & RPC_TASK_SWAPPER) && size <= sizeof(swap_buffer)
+		 && rpc_lock_swapbuf()) {
 			dprintk("RPC:      used last-ditch swap buffer\n");
 			return swap_buffer;
 		}
@@ -587,6 +789,7 @@
 			return NULL;
 		current->state = TASK_INTERRUPTIBLE;
 		schedule_timeout(HZ>>4);
+		current->state = TASK_RUNNING;
 	} while (!signalled());
 
 	return NULL;
@@ -599,20 +802,24 @@
 		kfree(buffer);
 		return;
 	}
-	swap_buffer_used = 0;
+	rpc_unlock_swapbuf();
 }
 
 /*
  * Creation and deletion of RPC task structures
  */
-inline void
+void
 rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt,
 				rpc_action callback, int flags)
 {
 	memset(task, 0, sizeof(*task));
+	init_timer(&task->tk_timer);
+	task->tk_timer.data     = (unsigned long) task;
+	task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer;
+
 	task->tk_client = clnt;
-	task->tk_flags  = RPC_TASK_RUNNING | flags;
 	task->tk_exit   = callback;
+	task->tk_flags  = flags;
 	if (current->uid != current->fsuid || current->gid != current->fsgid)
 		task->tk_flags |= RPC_TASK_SETUID;
 
@@ -621,6 +828,11 @@
 	task->tk_cred_retry = 2;
 	task->tk_suid_retry = 1;
 
+#ifdef RPC_DEBUG
+	task->tk_magic = 0xf00baa;
+	task->tk_pid = rpc_task_id++;
+#endif
+
 	/* Add to global list of all tasks */
 	task->tk_next_task = all_tasks;
 	task->tk_prev_task = NULL;
@@ -629,14 +841,11 @@
 	all_tasks = task;
 
 	if (clnt)
-		clnt->cl_users++;
+		atomic_inc(&clnt->cl_users);
 
-#ifdef RPC_DEBUG
-	task->tk_magic = 0xf00baa;
-	task->tk_pid = rpc_task_id++;
-#endif
-	dprintk("RPC: %4d new task procpid %d\n", task->tk_pid,
-				current->pid);
+	dprintk("RPC: %d new task procpid %d%s\n",
+			task->tk_pid, current->pid,
+			(flags & RPC_TASK_DYNAMIC) ? " (alloc)" : "");
 }
 
 /*
@@ -653,10 +862,7 @@
 	if (!task)
 		goto cleanup;
 
-	rpc_init_task(task, clnt, callback, flags);
-
-	dprintk("RPC: %4d allocated task\n", task->tk_pid);
-	task->tk_flags |= RPC_TASK_DYNAMIC;
+	rpc_init_task(task, clnt, callback, flags | RPC_TASK_DYNAMIC);
 out:
 	return task;
 
@@ -664,8 +870,8 @@
 	/* Check whether to release the client */
 	if (clnt) {
 		printk("rpc_new_task: failed, users=%d, oneshot=%d\n",
-			clnt->cl_users, clnt->cl_oneshot);
-		clnt->cl_users++; /* pretend we were used ... */
+			atomic_read(&clnt->cl_users), clnt->cl_oneshot);
+		atomic_inc(&clnt->cl_users); /* pretend we were used ... */
 		rpc_release_client(clnt);
 	}
 	goto out;
@@ -675,9 +881,19 @@
 rpc_release_task(struct rpc_task *task)
 {
 	struct rpc_task	*next, *prev;
+	unsigned long	oldflags;
 
 	dprintk("RPC: %4d release task\n", task->tk_pid);
 
+#ifdef RPC_DEBUG
+	if (task->tk_magic != 0xf00baa) {
+		printk(KERN_ERR "RPC: attempt to release a non-existing task!\n");
+		rpc_debug = ~0;
+		rpc_show_tasks();
+		return;
+	}
+#endif
+
 	/* Remove from global task list */
 	prev = task->tk_prev_task;
 	next = task->tk_next_task;
@@ -687,12 +903,29 @@
 		prev->tk_next_task = next;
 	else
 		all_tasks = next;
+	task->tk_next_task = task->tk_prev_task = NULL;
+
+	/* Protect the execution below. */
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
+
+	/* Disable timer to prevent zombie wakeup */
+	__rpc_disable_timer(task);
+
+	/* Remove from any wait queue we're still on */
+	__rpc_remove_wait_queue(task);
+
+	task->tk_active = 0;
+
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
+
+	/* Synchronously delete any running timer */
+	rpc_delete_timer(task);
 
 	/* Release resources */
 	if (task->tk_rqstp)
 		xprt_release(task);
-	if (task->tk_cred)
-		rpcauth_releasecred(task);
+	if (task->tk_msg.rpc_cred)
+		rpcauth_unbindcred(task);
 	if (task->tk_buffer) {
 		rpc_free(task->tk_buffer);
 		task->tk_buffer = NULL;
@@ -708,9 +941,12 @@
 
 	if (task->tk_flags & RPC_TASK_DYNAMIC) {
 		dprintk("RPC: %4d freeing task\n", task->tk_pid);
+		if (task->tk_release)
+			task->tk_release(task);
 		task->tk_flags &= ~RPC_TASK_DYNAMIC;
 		rpc_free(task);
-	}
+	} else if (task->tk_release)
+			task->tk_release(task);
 }
 
 /*
@@ -719,14 +955,18 @@
  * parent task may already have gone away
  */
 static inline struct rpc_task *
-rpc_find_parent(struct rpc_task *child)
+__rpc_find_parent(struct rpc_task *child)
 {
-	struct rpc_task	*temp, *parent;
+	struct rpc_task	*head, *parent;
 
-	parent = (struct rpc_task *) child->tk_calldata;
-	for (temp = childq.task; temp; temp = temp->tk_next) {
-		if (temp == parent)
-			return parent;
+	parent = child->tk_parent;
+	if ((head = childq.task) != NULL) {
+		struct rpc_task	*task = head;
+		do {
+			if (task == parent)
+				return parent;
+			task = task->tk_next;
+		} while (task != head);
 	}
 	return NULL;
 }
@@ -735,12 +975,14 @@
 rpc_child_exit(struct rpc_task *child)
 {
 	struct rpc_task	*parent;
+	unsigned long	oldflags;
 
-	if ((parent = rpc_find_parent(child)) != NULL) {
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
+	if ((parent = __rpc_find_parent(child)) != NULL) {
 		parent->tk_status = child->tk_status;
-		rpc_wake_up_task(parent);
+		__rpc_wake_up_task(parent);
 	}
-	rpc_release_task(child);
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
 }
 
 /*
@@ -755,7 +997,7 @@
 	if (!task)
 		goto fail;
 	task->tk_exit = rpc_child_exit;
-	task->tk_calldata = parent;
+	task->tk_parent = parent;
 	return task;
 
 fail:
@@ -766,13 +1008,13 @@
 void
 rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func)
 {
-	unsigned long oldflags;
+	unsigned long	oldflags;
 
-	save_flags(oldflags); cli();
-	rpc_make_runnable(child);
-	restore_flags(oldflags);
+	spin_lock_irqsave(&rpc_queue_lock, oldflags);
 	/* N.B. Is it possible for the child to have already finished? */
-	rpc_sleep_on(&childq, task, func, NULL);
+	__rpc_sleep_on(&childq, task, func, NULL);
+	__rpc_schedule_run(child);
+	spin_unlock_irqrestore(&rpc_queue_lock, oldflags);
 }
 
 /*
@@ -785,8 +1027,9 @@
 	struct rpc_task	**q, *rovr;
 
 	dprintk("RPC:      killing all tasks for client %p\n", clnt);
-	/* N.B. Why bother to inhibit? Nothing blocks here ... */
-	rpc_inhibit++;
+	/*
+	 * Spin lock all_tasks to prevent changes...
+	 */
 	for (q = &all_tasks; (rovr = *q); q = &rovr->tk_next_task) {
 		if (!clnt || rovr->tk_client == clnt) {
 			rovr->tk_flags |= RPC_TASK_KILLED;
@@ -794,11 +1037,16 @@
 			rpc_wake_up_task(rovr);
 		}
 	}
-	rpc_inhibit--;
 }
 
 static struct semaphore rpciod_running = MUTEX_LOCKED;
 
+static inline int
+rpciod_task_pending(void)
+{
+	return schedq.task != NULL || xprt_tcp_pending();
+}
+
 /*
  * This is the rpciod kernel thread
  */
@@ -806,11 +1054,12 @@
 rpciod(void *ptr)
 {
 	struct wait_queue **assassin = (struct wait_queue **) ptr;
-	unsigned long	oldflags;
 	int		rounds = 0;
 
 	MOD_INC_USE_COUNT;
+
 	lock_kernel();
+
 	/*
 	 * Let our maker know we're running ...
 	 */
@@ -837,22 +1086,17 @@
 		}
 		__rpc_schedule();
 
-		if (++rounds >= 64) {	/* safeguard */
+		if (++rounds >= 64 || current->need_resched) {	/* safeguard */
 			schedule();
 			rounds = 0;
 		}
-		save_flags(oldflags); cli();
-		dprintk("RPC: rpciod running checking dispatch\n");
-		rpciod_tcp_dispatcher();
 
-		if (!schedq.task) {
+		if (!rpciod_task_pending()) {
 			dprintk("RPC: rpciod back to sleep\n");
-			interruptible_sleep_on(&rpciod_idle);
+			wait_event_interruptible(rpciod_idle, rpciod_task_pending());
 			dprintk("RPC: switch to rpciod\n");
-			rpciod_tcp_dispatcher();
 			rounds = 0;
 		}
-		restore_flags(oldflags);
 	}
 
 	dprintk("RPC: rpciod shutdown commences\n");
@@ -882,6 +1126,7 @@
 			dprintk("rpciod_killall: waiting for tasks to exit\n");
 			current->state = TASK_INTERRUPTIBLE;
 			schedule_timeout(1);
+			current->state = TASK_RUNNING;
 		}
 	}
 
@@ -953,6 +1198,7 @@
 	current->sigpending = 0;
 	current->state = TASK_INTERRUPTIBLE;
 	schedule_timeout(1);
+	current->state = TASK_RUNNING;
 	/*
 	 * Display a message if we're going to wait longer.
 	 */
@@ -972,36 +1218,23 @@
 	MOD_DEC_USE_COUNT;
 }
 
-#ifdef RPC_DEBUG
-#include <linux/nfs_fs.h>
 void rpc_show_tasks(void)
 {
 	struct rpc_task *t = all_tasks, *next;
-	struct nfs_wreq *wreq;
 
-	if (!t)
+	t = all_tasks;
+	if (!t) {
 		return;
+	}
 	printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
 		"-rpcwait -action- --exit--\n");
 	for (; t; t = next) {
 		next = t->tk_next_task;
 		printk("%05d %04d %04x %06d %8p %6d %8p %08ld %8s %8p %8p\n",
-			t->tk_pid, t->tk_proc, t->tk_flags, t->tk_status,
+			t->tk_pid, t->tk_msg.rpc_proc, t->tk_flags, t->tk_status,
 			t->tk_client, t->tk_client->cl_prog,
 			t->tk_rqstp, t->tk_timeout,
 			t->tk_rpcwait ? rpc_qname(t->tk_rpcwait) : " <NULL> ",
 			t->tk_action, t->tk_exit);
-
-		if (!(t->tk_flags & RPC_TASK_NFSWRITE))
-			continue;
-		/* NFS write requests */
-		wreq = (struct nfs_wreq *) t->tk_calldata;
-		printk("     NFS: flgs=%08x, pid=%d, pg=%p, off=(%d, %d)\n",
-			wreq->wb_flags, wreq->wb_pid, wreq->wb_page,
-			wreq->wb_offset, wreq->wb_bytes);
-		printk("          name=%s/%s\n",
-			wreq->wb_file->f_dentry->d_parent->d_name.name,
-			wreq->wb_file->f_dentry->d_name.name);
 	}
 }
-#endif

FUNET's LINUX-ADM group, linux-adm@nic.funet.fi
TCL-scripts by Sam Shen (who was at: slshen@lbl.gov)