Back in May, I was wrapping PhyreEngine and porting the samples to C#. To extend one of them and demonstrate some of the capabilities of C#, Miguel and I decided to use simple iterator-based microthreading, which simulates multithreading but with many microthreads within a single real thread. Unity does something like it in their game engine too. It enables you to use a very imperative style of coding, as if using a single dedicated thread for each, but without anywhere near the overhead of real threads.
Here's the usage example we came up initially with that drove my design of the microthread scheduler:
public class Enemy1 { public void Init (Scheduler scheduler) { scheduler.AddTask (Patrol ()); } IEnumerable Patrol () { while (alive){ if (CanSeeTarget ()) { yield return Attack (); } else if (InReloadStation){ Signal signal = AnimateReload (); yield return signal; } else { MoveTowardsNextWayPoint (); yield return TimeSpan.FromSeconds (1); }; } yield break; } IEnumerable Attack () { while (TargetAlive && CanSeeTarget ()){ AimAtTarget (); Fire (); yield return TimeSpan.FromSeconds (2); } } }
The concept is fairly simple. The C# compiler magically transforms this code into an IEnumerator state machine (returned by IEnumerable.GetEnumerator()). Each time IEnumerator.MoveNext() is called, your method "iterates": it runs to the next yield statement, sets the Current property to the value yielded, and keeps enough state that next time it iterates, it can effectively resume where it left off. We can yield nulls to give other microthreads a chance to run, or yield objects to tell the scheduler other things. For example, yielding a TimeSpan could cause the microthread to sleep for that long.
interface IEnumerator { bool MoveNext (); object Current; }
As you can see, although C# iterators are primarily intended for iterating through collections, the yield keyword can also become effectively something like a microthread cooperatively yielding. Your method runs until it yields, then it later resumes from this point, runs to the next yield, and so on.
The class that enumerates your iterator is the scheduler. Before we get to that, we'll cover the boring bits to set some groundwork. First, we need a class to encapsulate the task. This holds hold the IEnumerator and which Scheduler it belongs to, is a singly linked list node, and has a field for arbitrary data we'll use later.
//tasks may move between lists but they may only be in one list at a time internal class TaskItem { public readonly IEnumerator Task; public TaskItem Next; public Scheduler Scheduler; public long Data; public TaskItem (IEnumerator task, Scheduler scheduler) { this.Task = task; this.Scheduler = scheduler; } }
Next we need a simple linked list for keeping lists of TaskItem. We're not using LinkedList<T>; this is much simpler, does only what we need, and makes it easy to move tasks between lists and remove them via the enumerator.
internal sealed class TaskList { public readonly Scheduler Scheduler; public TaskItem First { get; private set; } public TaskItem Last { get; private set; } public TaskList (Scheduler scheduler) { this.Scheduler = scheduler; } public void Append (TaskItem task) { Debug.Assert (task.Next == null); if (First == null) { Debug.Assert (Last == null); First = Last = task; } else { Debug.Assert (Last.Next == null); Last.Next = task; Last = task; } } public void Remove (TaskItem task, TaskItem previous) { if (previous == null) { Debug.Assert (task == First); First = task.Next; } else { Debug.Assert (previous.Next == task); previous.Next = task.Next; } if (task.Next == null) { Debug.Assert (Last == task); Last = previous; } task.Next = null; } public TaskEnumerator GetEnumerator () { return new TaskEnumerator (this); } public sealed class TaskEnumerator { TaskList list; TaskItem current, previous; public TaskEnumerator (TaskList list) { this.list = list; previous = current = null; } public TaskItem Current { get { return current; } } public bool MoveNext () { TaskItem next; if (current == null) { if (previous == null) next = list.First; else next = previous.Next; } else { next = current.Next; } if (next != null) { if (current != null) previous = Current; current = next; return true; } return false; } public void MoveCurrentToList (TaskList otherList) { otherList.Append (RemoveCurrent ()); } public TaskItem RemoveCurrent () { Debug.Assert (current != null); TaskItem ret = current; list.Remove (current, previous); current = null; return ret; } } }
Now we can implement the scheduler. Using the scheduler is very simple. You register tasks with RegisterTask(IEnumerable), then call Run() to run all the active tasks for one yield iteration each. It handles sleeping and waking up tasks and removing tasks once they're finished.
public sealed class Scheduler { TaskList active, sleeping; public Scheduler () { active = new TaskList (this); sleeping = new TaskList (this); } public void AddTask (IEnumerator task) { active.Append (new TaskItem (task, this)); } public void Run () { //cache this, it's expensive to access DateTime.Now long nowTicks = DateTime.Now.Ticks; //move woken tasks back into the active list var en = sleeping.GetEnumerator (); while (en.MoveNext ()) if (en.Current.Data < nowTicks) en.MoveCurrentToList (active); //run all the active tasks en = active.GetEnumerator (); while (en.MoveNext ()) { //run each task's enumerator for one yield iteration IEnumerator t = en.Current.Task; if (!t.MoveNext ()) { //it finished, so remove it en.RemoveCurrent (); continue; } //check the current state object state = t.Current; if (state == null) { //it's just cooperatively yielding, state unchanged continue; } else if (state is TimeSpan) { //it wants to sleep, move to the sleeping list. we use the Data property for the wakeup time en.Current.Data = nowTicks + ((TimeSpan)state).Ticks; en.MoveCurrentToList (sleeping); } else if (state is IEnumerable) { throw new NotImplementedException ("Nested tasks are not supported yet"); } else { throw new InvalidOperationException ("Unknown task state returned:" + state.GetType ().FullName); } } } internal void AddToActive (TaskItem task) { active.Append (task); } }
At this point, it looks fairly useful, but let's add a simple synchronization primitive too. Microthreads should never make long blocking calls because they can't be pre-empted. Instead, we're going to let the microthread obtain and yield a signal object, which means it will not be scheduled until the signal has been set. Instead of using blocking APIs, you can use async APIs, create a signal object, wait on that, and have the callback set the signal. Or, thinking back to the initial game example, some game object's controlling microthread might want to sleep until another game object reaches a certain state; the target object can keep a signal accessible via a property that microthreads can read and wait on.
A thread can wait for more that one signal, and more than one thread can wait for a signal. Essentially we're going to have an equivalent of .NET's AutoResetEvent and WaitHandle.WaitAll(WaiHandle[]).
The signal's job is to keep a list of all the tasks that are waiting for it. When tasks start waiting, they move out of the scheduler's lists and are tracked by all the signals instead. Each signal increments/decrements the task's Data property to keep track of how many signals the task is waiting for. When the count reaches zero, the task can be moved back to the scheduler.
public class Signal { static int nextId = int.MinValue; int id = nextId++; List<TaskItem> tasks = new List<TaskItem> (); bool isSet = true; public void Set () { if (isSet) return; isSet = true; //decrement the wait count of all tasks waiting for thsi signal foreach (TaskItem task in tasks) if (--task.Data == 0) //if the wait count is zero, the task isn't waiting for any more signals, so re-schedule it task.Scheduler.AddToActive (task); tasks.Clear (); } internal void Add (TaskItem task) { //signal only becomes unset when it has tasks if (isSet) isSet = false; //the signal keeps a list of tasks that are waiting for it tasks.Add (task); //use the task's data for tracking the number of signals it's still waiting for task.Data++; } }
And we need to add a couple more checks to the Scheduler.Run() state handling, so that when the task returns a signal or collection/array of signals, it's moved from the scheduler's lists to the tasks' lists.
} else if (state is Signal) { TaskItem task = en.RemoveCurrent (); task.Data = 0; ((Signal)state).Add (task); } else if (state is ICollection<Signal>) { TaskItem task = en.RemoveCurrent (); task.Data = 0; foreach (Signal s in ((ICollection<Signal>)state)) s.Add (task); }
I have a few more ideas to improve this and turn it into a real library:
Signal that's a ManualResetEvent analogueIEnumerable<ThreadState> instead of plain IEnumerable, where ThreadState is a union struct with an enum specifying its type. This could be used to avoid the boxing of TimeSpan and the type checks and casts in the scheduler — just switch on the enum value. It would also prevent consumers returning a zero TimeSpan instead of null.Some people may wonder why I haven't mentioned interactions with real threads. If the scheduler were thread-aware, then you could have multiple real threads on different cores consuming the microthread queue, and it would be faster with multiple cores and avoid blocking on slow tasks. The problem is not just that this increases the complexity, but that the microthreads all would have to be aware of threading too, and would need to lock all the objects they touched, and so on. This scheduler is meant to run in the equivalent of the GUI thread, purely to enable driving high-level game logic (and similar things) with an intuitive thread-like imperative programming model, and minimal overhead. If it doesn't fit easily on one core you're probably using it for the wrong thing. There might be cases where it's useful to create multiple Schedulers, and run them in different thread, but be careful about what the microthreads touch.
This is part of the Catchup 2010 series of posts.
Comments
link to the source?
Hey this is awesome stuff Michael.
I reckon it would be a good idea to separate this out and turn this into a mini project.
A benchmark comparing a trivial example against a threaded version would be sweet as it would be interesting to know how much faster this approach is.
Do you think this approach is a good idea running as a socket server capable of running multiple long running tasks?
Basically would it be beneficial to have the core socket server microthreaded, handling socket connections then be able spawn new tasks on a thread pool? then when the task is finished call back into the microthreaded socket server to handle the response?
This approach works great for
This approach works great for socket servers, since it allows you to use nonblocking operations to wait on individual connections without having to write lots of callbacks and do complex error handling - you can just use a microthread (or multiple microthreads) per connection to handle things and get error propagation through your iterators.
Here's an example telnet chat server written using a similar microthreading library (shameless plug):
http://code.google.com/p/fracture/source/browse/#svn/trunk/Squared/Examples/TelnetChatServer
It uses .NET's built in nonblocking IO support to wait on connections, so you can connect hundreds of clients to it without any significant memory/CPU overhead. Using microthreads makes it really easy to handle everything since you don't have to deal with synchronization.
TelnetChatServer looks intersting
Hey, you have an interesting project there.
Its use of async IO is similar to that of http://chat.nodejs.org although nodejs seem to work solely off timers, IO events+callbacks.
It would definitely be interesting to know how each approach fares and which would perform and scale better.
I think it will be a close one since it uses google's V8 javascript engine at its core.
In theory
In theory it should work very well as long as you use async socket API. On a similar topic, Mono's ASP.NET pipeline is actually built on iterators, and Miguel blogged about it some years ago.
All the source is contained within the post, but I'm reluctant to put it in one place as a library until it's a little more mature.
Microthreads don't actually
Microthreads don't actually have to be synchronization aware. If you marsshal all signal responses to a single thread as tasks, you get synchronization for free. I use this technique in my Squared.Task microthreading libraary and it works great.
For interactive applications you can even cheat by using the Win32 message pump as your scheduler via a hidden window, eliminating the need to pump your scheduler or synchronize with a separate ui thread.
Interesting project you have there!
Interesting project you have there!
Yes, the key point for keeping threading/synchronization simple for the microthreads themselves is that all the objects touched by the microthreads have to be safe for access from the microthreads' thread. There are certainly a few ways to do this — and though I'm not familiar with win32 and don't understand the need for a hidden window, if using GTK# I'd likely drive it via an idle handler or timeout in the GTK GUI mainloop, and keep most stuff in the GUI thread.
So. Damn. Cool.
That is all.
On a side-note; this is wildly fascinating as a state machine - never mind the microthreading.
Might as well call it out
decided to use simple iterator-based microthreading, which simulates multithreading but with many microthreads within a single real thread.
http://en.wikipedia.org/wiki/Coroutine
Right
Right, I know that the iterators are a form of coroutine, but the focus here is how they're used. I'm curious why your comment is anonymous.
Old good JavaScript threading
Looks really cool. Just look at http://www.neilmix.com/2007/02/07/threading-in-javascript-17/ for more ideas :)
Could you please provide a
Could you please provide a simple/basic example how to use it?
Example from the top of the post wouldn't compile, as scheduler.AddTask() requires IEnumerator while Patrol() is IEnumerable?
TIA,
Dejan
Sorry, I meant to start a new
Sorry, I meant to start a new thread and not a reply to olegzzzz.
LP,
Dejan
Right
Right, the Scheduler didn't quite match our initial use-case design. There are few ways to make it work:
* Get the enumerator from the Patrol IEnumerable:
* Have an overload of Scheduler.AddTask that gets the Enumerator from an IEnumerable:
* Have Patrol() return IEnumerator instead of IEnumerable. The C# compiler can generate that directly, and this will also save you an object allocation.
Thanks.
Thanks.
How about memory allocations?
Very interesting, out-of-the-box solution.
I was wondering, have you measured the Run() method touches the heap (assuming the microthreads do not allocate memory?)
It's fairly straightforward
The scheduler will read and alter quite a few heap objects because each task is encapsulated in a
TaskItemobject. Allocations are much more limited. I haven't measured it but it's fairly straightforward. At a quick scan, the heap object creations are as follows:In the scheduler:
* Adding a task will create a
TaskItemobject* Each call to
Run()will create 2 list enumerators* Moving tasks to signals might allocate arrays for the signals' lists
In the threads:
* Yielded
TimeSpanstructs will be boxed, i.e. creating heap objects* Yielded signals might have to be created by whatever API the thread gets them from
To cut down allocations you could eliminate the list enumerators and walk the lists directly in
Run(), or implement aReset()method on the enumerators and re-use them. You could eliminate the boxing ofTimeSpanstructs by usingIEnumerable<SomeTaskStateStruct>or implementing some kind of re-usable box object. The others are unavoidable.On pipelines and stuff
Did you have a look at this lib from Jon "yoda" Skeet? http://www.yoda.arachsys.com/csharp/miscutil/ ;-)
Saving the state machine to disk
Nice!
By the way, it is possible to save the compiler-generated state machine to disk, and then resume it days or weeks later from the disk file. I posted a proof of concept here. But, unfortunately, it's not really suitable for production use because Microsoft say that they may change the way the state machine works, which could make it impossible to re-load the saved file after upgrading to a new version of the runtime.
So I guess that, non-persistent cases, like your nice one above, are the way to go for now.
John
Software Design
Designing a software with source code is like describing a music with words : inadequate.
Durham graduate doing it, is plainly tragic ...
DBJ