Iterator-based Microthreading

Back in May, I was wrapping PhyreEngine and porting the samples to C#. To extend one of them and demonstrate some of the capabilities of C#, Miguel and I decided to use simple iterator-based microthreading, which simulates multithreading but with many microthreads within a single real thread. Unity does something like it in their game engine too. It enables you to use a very imperative style of coding, as if using a single dedicated thread for each, but without anywhere near the overhead of real threads.

Here's the usage example we came up initially with that drove my design of the microthread scheduler:

public class Enemy1
{
	public void Init (Scheduler scheduler)
	{
		scheduler.AddTask (Patrol ());
	}
 
	IEnumerable Patrol ()
	{
		while (alive){
			if (CanSeeTarget ()) {
				yield return Attack ();
			} else if (InReloadStation){
				Signal signal = AnimateReload ();
				yield return signal;
			} else {
				MoveTowardsNextWayPoint ();
				yield return TimeSpan.FromSeconds (1);
			};
		}
		yield break;
	}
 
	IEnumerable Attack ()
	{
		while (TargetAlive && CanSeeTarget ()){
			AimAtTarget ();
			Fire ();
			yield return TimeSpan.FromSeconds (2);
		}
	}
}

The concept is fairly simple. The C# compiler magically transforms this code into an IEnumerator state machine (returned by IEnumerable.GetEnumerator()). Each time IEnumerator.MoveNext() is called, your method "iterates": it runs to the next yield statement, sets the Current property to the value yielded, and keeps enough state that next time it iterates, it can effectively resume where it left off. We can yield nulls to give other microthreads a chance to run, or yield objects to tell the scheduler other things. For example, yielding a TimeSpan could cause the microthread to sleep for that long.

interface IEnumerator
{
	bool MoveNext ();
	object Current;
}

As you can see, although C# iterators are primarily intended for iterating through collections, the yield keyword can also become effectively something like a microthread cooperatively yielding. Your method runs until it yields, then it later resumes from this point, runs to the next yield, and so on.

The class that enumerates your iterator is the scheduler. Before we get to that, we'll cover the boring bits to set some groundwork. First, we need a class to encapsulate the task. This holds hold the IEnumerator and which Scheduler it belongs to, is a singly linked list node, and has a field for arbitrary data we'll use later.

//tasks may move between lists but they may only be in one list at a time
internal class TaskItem
{
	public readonly IEnumerator Task;
	public TaskItem Next;
	public Scheduler Scheduler;
	public long Data;
 
	public TaskItem (IEnumerator task, Scheduler scheduler)
	{
		this.Task = task;
		this.Scheduler = scheduler;
	}
}

Next we need a simple linked list for keeping lists of TaskItem. We're not using LinkedList<T>; this is much simpler, does only what we need, and makes it easy to move tasks between lists and remove them via the enumerator.

internal sealed class TaskList
{
	public readonly Scheduler Scheduler;
 
	public TaskItem First { get; private set; }
	public TaskItem Last { get; private set; }
 
	public TaskList (Scheduler scheduler)
	{
		this.Scheduler = scheduler;
	}
 
	public void Append (TaskItem task)
	{
		Debug.Assert (task.Next == null);
		if (First == null) {
			Debug.Assert (Last == null);
			First = Last = task;
		} else {
			Debug.Assert (Last.Next == null);
			Last.Next = task;
			Last = task;
		}
	}
 
	public void Remove (TaskItem task, TaskItem previous)
	{
		if (previous == null) {
			Debug.Assert (task == First);
			First = task.Next;
		} else {
			Debug.Assert (previous.Next == task);
			previous.Next = task.Next;
		}
 
		if (task.Next == null) {
			Debug.Assert (Last == task);
			Last = previous;
		}
		task.Next = null;
	}
 
	public TaskEnumerator GetEnumerator ()
	{
		return new TaskEnumerator (this);
	}
 
	public sealed class TaskEnumerator
	{
		TaskList list;
		TaskItem current, previous;
 
		public TaskEnumerator (TaskList list)
		{
			this.list = list;
			previous = current = null;
		}
 
		public TaskItem Current { get { return current; } }
 
		public bool MoveNext ()
		{
			TaskItem next;
			if (current == null) {
				if (previous == null)
					next = list.First;
				else
					next = previous.Next;
			} else {
				next = current.Next;
			}
 
			if (next != null) {
				if (current != null)
					previous = Current;
				current = next;
				return true;
			}
			return false;
		}
 
		public void MoveCurrentToList (TaskList otherList)
		{
			otherList.Append (RemoveCurrent ());
		}
 
		public TaskItem RemoveCurrent ()
		{
			Debug.Assert (current != null);
			TaskItem ret = current;
			list.Remove (current, previous);
			current = null;
			return ret;
		}
	}
}

Now we can implement the scheduler. Using the scheduler is very simple. You register tasks with RegisterTask(IEnumerable), then call Run() to run all the active tasks for one yield iteration each. It handles sleeping and waking up tasks and removing tasks once they're finished.

public sealed class Scheduler
{
	TaskList active, sleeping;
 
	public Scheduler ()
	{
		active = new TaskList (this);
		sleeping = new TaskList (this);
	}
 
	public void AddTask (IEnumerator task)
	{
		active.Append (new TaskItem (task, this));
	}
 
	public void Run ()
	{
		//cache this, it's expensive to access DateTime.Now
		long nowTicks = DateTime.Now.Ticks;
 
		//move woken tasks back into the active list
		var en =  sleeping.GetEnumerator ();
		while (en.MoveNext ())
			if (en.Current.Data < nowTicks)
				en.MoveCurrentToList (active);
 
		//run all the active tasks
		en = active.GetEnumerator ();
		while (en.MoveNext ()) {
			//run each task's enumerator for one yield iteration
			IEnumerator t = en.Current.Task;
			if (!t.MoveNext ()) {
				//it finished, so remove it
				en.RemoveCurrent ();
				continue;
			}
 
			//check the current state
			object state = t.Current;
			if (state == null) {
				//it's just cooperatively yielding, state unchanged 
				continue;
			} else if  (state is TimeSpan) {
				//it wants to sleep, move to the sleeping list. we use the Data property for the wakeup time 
				en.Current.Data = nowTicks + ((TimeSpan)state).Ticks;
				en.MoveCurrentToList (sleeping);
			} else if (state is IEnumerable) {
				throw new NotImplementedException ("Nested tasks are not supported yet");
			} else {
				throw new InvalidOperationException ("Unknown task state returned:" + state.GetType ().FullName);
			} 
		}
	}
 
	internal void AddToActive (TaskItem task)
	{
		active.Append (task);
	}
}

At this point, it looks fairly useful, but let's add a simple synchronization primitive too. Microthreads should never make long blocking calls because they can't be pre-empted. Instead, we're going to let the microthread obtain and yield a signal object, which means it will not be scheduled until the signal has been set. Instead of using blocking APIs, you can use async APIs, create a signal object, wait on that, and have the callback set the signal. Or, thinking back to the initial game example, some game object's controlling microthread might want to sleep until another game object reaches a certain state; the target object can keep a signal accessible via a property that microthreads can read and wait on.

A thread can wait for more that one signal, and more than one thread can wait for a signal. Essentially we're going to have an equivalent of .NET's AutoResetEvent and WaitHandle.WaitAll(WaiHandle[]).

The signal's job is to keep a list of all the tasks that are waiting for it. When tasks start waiting, they move out of the scheduler's lists and are tracked by all the signals instead. Each signal increments/decrements the task's Data property to keep track of how many signals the task is waiting for. When the count reaches zero, the task can be moved back to the scheduler.

public class Signal
{
	static int nextId = int.MinValue;
 
	int id = nextId++;
	List<TaskItem> tasks = new List<TaskItem> ();
	bool isSet = true;
 
	public void Set ()
	{
		if (isSet)
			return;
		isSet = true;
		//decrement the wait count of all tasks waiting for thsi signal
		foreach (TaskItem task in tasks)
			if (--task.Data == 0)
				//if the wait count is zero, the task isn't waiting for any more signals, so re-schedule it
				task.Scheduler.AddToActive (task);
		tasks.Clear ();
	}
 
	internal void Add (TaskItem task)
	{
		//signal only becomes unset when it has tasks
		if (isSet)
			isSet = false;
		//the signal keeps a list of tasks that are waiting for it
		tasks.Add (task);
		//use the task's data for tracking the number of signals it's still waiting for
		task.Data++;
	}
}

And we need to add a couple more checks to the Scheduler.Run() state handling, so that when the task returns a signal or collection/array of signals, it's moved from the scheduler's lists to the tasks' lists.

} else if (state is Signal) {
	TaskItem task = en.RemoveCurrent ();
	task.Data = 0;
	((Signal)state).Add (task);
} else if (state is ICollection<Signal>) {
	TaskItem task = en.RemoveCurrent ();
	task.Data = 0;
	foreach (Signal s in ((ICollection<Signal>)state))
		s.Add (task);
}

And, there it is.

I have a few more ideas to improve this and turn it into a real library:

  • Implement a Signal that's a ManualResetEvent analogue
  • Use a IEnumerable<ThreadState> instead of plain IEnumerable, where ThreadState is a union struct with an enum specifying its type. This could be used to avoid the boxing of TimeSpan and the type checks and casts in the scheduler — just switch on the enum value. It would also prevent consumers returning a zero TimeSpan instead of null.
  • Dispose any disposable task IEnumerators
  • Implement task priorities, probably using different lists
  • Tidy up the accessibility and API a bit
  • Add a Scheduler.Run overload that only runs some number of iterations, not the whole list.

Some people may wonder why I haven't mentioned interactions with real threads. If the scheduler were thread-aware, then you could have multiple real threads on different cores consuming the microthread queue, and it would be faster with multiple cores and avoid blocking on slow tasks. The problem is not just that this increases the complexity, but that the microthreads all would have to be aware of threading too, and would need to lock all the objects they touched, and so on. This scheduler is meant to run in the equivalent of the GUI thread, purely to enable driving high-level game logic (and similar things) with an intuitive thread-like imperative programming model, and minimal overhead. If it doesn't fit easily on one core you're probably using it for the wrong thing. There might be cases where it's useful to create multiple Schedulers, and run them in different thread, but be careful about what the microthreads touch.

This is part of the Catchup 2010 series of posts.

Comments

Hey this is awesome stuff Michael.

I reckon it would be a good idea to separate this out and turn this into a mini project.
A benchmark comparing a trivial example against a threaded version would be sweet as it would be interesting to know how much faster this approach is.

Do you think this approach is a good idea running as a socket server capable of running multiple long running tasks?
Basically would it be beneficial to have the core socket server microthreaded, handling socket connections then be able spawn new tasks on a thread pool? then when the task is finished call back into the microthreaded socket server to handle the response?

This approach works great for socket servers, since it allows you to use nonblocking operations to wait on individual connections without having to write lots of callbacks and do complex error handling - you can just use a microthread (or multiple microthreads) per connection to handle things and get error propagation through your iterators.

Here's an example telnet chat server written using a similar microthreading library (shameless plug):
http://code.google.com/p/fracture/source/browse/#svn/trunk/Squared/Examples/TelnetChatServer

It uses .NET's built in nonblocking IO support to wait on connections, so you can connect hundreds of clients to it without any significant memory/CPU overhead. Using microthreads makes it really easy to handle everything since you don't have to deal with synchronization.

Hey, you have an interesting project there.
Its use of async IO is similar to that of http://chat.nodejs.org although nodejs seem to work solely off timers, IO events+callbacks.
It would definitely be interesting to know how each approach fares and which would perform and scale better.
I think it will be a close one since it uses google's V8 javascript engine at its core.

In theory it should work very well as long as you use async socket API. On a similar topic, Mono's ASP.NET pipeline is actually built on iterators, and Miguel blogged about it some years ago.

All the source is contained within the post, but I'm reluctant to put it in one place as a library until it's a little more mature.

Microthreads don't actually have to be synchronization aware. If you marsshal all signal responses to a single thread as tasks, you get synchronization for free. I use this technique in my Squared.Task microthreading libraary and it works great.

For interactive applications you can even cheat by using the Win32 message pump as your scheduler via a hidden window, eliminating the need to pump your scheduler or synchronize with a separate ui thread.

Interesting project you have there!

Yes, the key point for keeping threading/synchronization simple for the microthreads themselves is that all the objects touched by the microthreads have to be safe for access from the microthreads' thread. There are certainly a few ways to do this — and though I'm not familiar with win32 and don't understand the need for a hidden window, if using GTK# I'd likely drive it via an idle handler or timeout in the GTK GUI mainloop, and keep most stuff in the GUI thread.

That is all.

On a side-note; this is wildly fascinating as a state machine - never mind the microthreading.

[quote=Michael]decided to use simple iterator-based microthreading, which simulates multithreading but with many microthreads within a single real thread.[/quote]

http://en.wikipedia.org/wiki/Coroutine

Right, I know that the iterators are a form of coroutine, but the focus here is how they're used. I'm curious why your comment is anonymous.

Looks really cool. Just look at http://www.neilmix.com/2007/02/07/threading-in-javascript-17/ for more ideas :)

Could you please provide a simple/basic example how to use it?
Example from the top of the post wouldn't compile, as scheduler.AddTask() requires IEnumerator while Patrol() is IEnumerable?

TIA,
Dejan

Sorry, I meant to start a new thread and not a reply to olegzzzz.

LP,
Dejan

Right, the Scheduler didn't quite match our initial use-case design. There are few ways to make it work:

* Get the enumerator from the Patrol IEnumerable:

scheduler.AddTask (Patrol ().GetEnumerator ());

* Have an overload of Scheduler.AddTask that gets the Enumerator from an IEnumerable:

public void AddTask (IEnumerable task)
{
	AddTask (task.GetEnumerator ());
}

* Have Patrol() return IEnumerator instead of IEnumerable. The C# compiler can generate that directly, and this will also save you an object allocation.

IEnumerator Patrol ()
{
	//foo
}

Thanks.

Hey thanks for the list and information. I too had a problem in the recent while that inline errors has had shown.This is good tip.I liked it so much I had to share it with my viewers. Thanks a bunch. visit

Very interesting, out-of-the-box solution.

I was wondering, have you measured the Run() method touches the heap (assuming the microthreads do not allocate memory?)

The scheduler will read and alter quite a few heap objects because each task is encapsulated in a TaskItem object. Allocations are much more limited. I haven't measured it but it's fairly straightforward. At a quick scan, the heap object creations are as follows:

In the scheduler:
* Adding a task will create a TaskItem object
* Each call to Run() will create 2 list enumerators
* Moving tasks to signals might allocate arrays for the signals' lists

In the threads:
* Yielded TimeSpan structs will be boxed, i.e. creating heap objects
* Yielded signals might have to be created by whatever API the thread gets them from

To cut down allocations you could eliminate the list enumerators and walk the lists directly in Run(), or implement a Reset() method on the enumerators and re-use them. You could eliminate the boxing of TimeSpan structs by using IEnumerable<SomeTaskStateStruct> or implementing some kind of re-usable box object. The others are unavoidable.

Did you have a look at this lib from Jon "yoda" Skeet? http://www.yoda.arachsys.com/csharp/miscutil/ ;-)

Nice!

By the way, it is possible to save the compiler-generated state machine to disk, and then resume it days or weeks later from the disk file. I posted a proof of concept here. But, unfortunately, it's not really suitable for production use because Microsoft say that they may change the way the state machine works, which could make it impossible to re-load the saved file after upgrading to a new version of the runtime.

So I guess that, non-persistent cases, like your nice one above, are the way to go for now.

John

I have been doing very very similar thing being influenced by CCR iterators and AsyncEnumerator of J.Richter.
I use multiple OS threads for executing a number of microthreads so some operations , like puting a microthread into "readylist" or accessing microthreading sync primitives, require real thread sync. What is most promising is integrating such microthreading with APM based or any other async API. I've tried it and it works perfectly.
Are you going to continue developing this as a library/framework?