Thursday, July 30, 2009

The Joy of Rx: The Event-based Async Pattern vs. IObservable

In part 1 we talked about how to convert events to IObservables.  However Rx isn't just about querying events, it's also about querying asynchronous operations. 

There are two common asynchronous patterns in the .NET framework: the event-based asynchronous pattern and Begin/EndInvoke.  Currently MSDN recommends using the event-based asynchronous pattern where possible.  This recommendation may or may not change when IObservable is released with .NET 4.0.  I want to make clear that the opinions expressed in this post are my own personal views and are not consistent with official MSDN guidance.

Don't use the event-based asynchronous patternOr if you must use the event-based asynchronous pattern be sure to also provide a Begin/EndInvoke version of the API that uses IAsyncResult.  Why?  Because the event-based asynchronous pattern is deeply flawed.  To understand why let's examine a typical piece of code that uses the event-based asynchronous pattern.

Guid token = Guid.NewGuid(); var webClient = new WebClient(); // We need to refer to the identifier within the body // of the method so we must first initialize it to null. // This means we can't use type inference to avoid // having to clutter our code with this absurdly long // (and entirely unnecessary) delegate type. DownloadStringCompletedEventHandler handler = null; handler = (o, a) => { if (((Guid)a.UserState) != token) return; // unhook from the event so that we don't keep firing // after we've gotten our data asynchronously. webClient.DownloadStringCompleted -= handler; if (a.Error != null) { // Handle exception. This may include throwing but // we may also have to invent some method of manually // propagating it if we don't have the knowledge to // handle it at this point and we are in the middle of several //other asynchronous operations. } if (!a.Cancelled) { Debug.WriteLine(string.Format("The downloaded HTML is {0}.", a.Result)); } } // Notice that the only link between the method and the event that returns // its data is a simple naming convention. There's no way to know for _sure_ // which event will return a method's data. client.DownloadStringCompleted += handler; client.DownloadStringAsync(new Uri("http://www.jeffwilcox.com"), token);

A thoroughly awful piece of code no?  The good news is that by adding an extension event to the WebClient class we can wrap the event-based asynchronous pattern in an IObservable.  Using Rx works around all of the the issues pointed out in the comments above. 

Of course it's rather cumbersome to create a new class that inherits from IObservable whenever we need to return the result of an asynchronous operation.  It also seems a little silly given that IObservable only has one method: Subscribe.  Given this fact it's helpful to create an AnonymousObservable class which accepts an action and invokes it when the Subscribe method is called.

internal class AnonymousObservable<T> : IObservable<T> { private Func<IObserver<T>, IDisposable> subscribeAction; public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribeAction) { this.subscribeAction = subscribeAction; } public IDisposable Subscribe(IObserver<T> observer) { return subscribeAction(observer); } }

Now that we've got our handy, reusable AnonymousObservable class we're ready to create our GetDownloadString extension event.

public static class WebClientExtensions { // No need to put Async in the method title, it's implicit given that // an observable is being returned. public static IObservable<string> GetDownloadString(this WebClient client, Uri address) { // Delay action by nesting it in an observable. // Nothing should ever happen until a client subscribes to the // observable, just as nothing should happen until an // IEnumerable is traversed. return new AnonymousObservable<string>( observer => { // Several downloads may be going on simultaneously. // The token allows us to establish that we're retrieving // the right one. Guid token = Guid.NewGuid(); var stringDownloaded = Observable.FromEvent<DownloadStringCompletedEventArgs>(client, "DownloadStringCompleted") // Confirm its our download using captured state variable .Where(evt => ((Guid)evt.EventArgs.UserState) == token) .Take(1); //implicitly unhooks handler after event is received bool errorOccurred = false; // Subscribe to the IObservable. Under the hood Rx // creates an anonymous observer class that invokes // the three actions passed to the Subscribe method. IDisposable unsubscribe = stringDownloaded.Subscribe( // OnNext action ev => { // Propagate the exception if one is // reported. After this, all observers // will continue to propagate this // exception via the OnError method call // until it is caught. if (ev.EventArgs.Error != null) { errorOccurred = true; observer.OnError(ev.EventArgs.Error); } else if (!ev.EventArgs.Cancelled) { observer.OnNext(ev.EventArgs.Result); } }, // OnError action (propagate exception) ex => observer.OnError(ex), // OnCompleted action () => { // No need to call OnCompleted if // there has been an exception. // It is implicit that there will // be no more data. if (!errorOccurred) { observer.OnCompleted(); } }); client.DownloadStringAsync(address, token); return unsubscribe; }); } }

Granted converting the event-based asynchronous pattern to IObservable requires a little bit of code, but the complexity is pushed onto the library developer.  Look at how easy it is to consume the IObservable API.

var webClient = new WebClient(); webClient .GetDownloadString(new Uri("http://www.jeffwilcox.com")) .Subscribe(html => Debug.WriteLine(html));

No need to fuss with event argument objects.  The IObservable/IObserver objects take on the concerns of cancelling and propagating exceptions.  The developer is given exactly what he or she expects: an asynchronously downloaded string containing the contents of a web page.

Building Complex Queries by Combining Asynchronous Operations

Now that we've got our handy GetDownloadString extension method we can build some pretty complex queries with it.  Let's use it to download the HTML from the first ten results pages of a Bing search at once. 

public static class EnumerableExtensions { // This is a very handy extension method missing from Linq. public static IEnumerable<T> Iterate<T>(T initalValue, Func<T, T> next) { yield return initalValue; while (true) { initalValue = next(initalValue); yield return initalValue; } } } // snip... var client = new WebClient(); var uri = "http://www.bing.com/search?q=Rx+framework&first={0}&FORM=PERE3"; var pageSize = 10; var resultIndices = EnumerableExtensions.Iterate(1, prev => prev + pageSize).Take(10); var pageDownloads = resultIndices .Select( resultIndex => client .GetDownloadString(new Uri(string.Format(uri, resultIndex))) // append the result index to the results so we can sort by it later .Select(html => new { ResultIndex = resultIndex, Html = html })) // Merge all of the observables into one so that we can subscribe to all of // them simultaneously. .Aggregate( (simultaneousDownloads, currentDownload) => Observable.Merge(simultaneousDownloads, currentDownload)); var concatenatedHtml = pageDownloads // Convert to a blocking enumerable. It makes no sense for // IObservable to have a GroupBy or OrderBy method because // both these methods must block until all the data is received. // However all of the downloads will begin simultaneously when // we traverse the IEnumerable .ToEnumerable() // Order by the result index .OrderBy(pageDownload => pageDownload.ResultIndex) // Grab the HTML .Select(pageDownload => pageDownload.Html) // Concatenate the HTML strings .Aggregate((accumulatedHtml, html) => accumulatedHtml + html) // Grab the first (and only) result. .First();

Pretty slick right?  There's only one problem. This code doesn't work :-(.  It will block forever.  The problem is that the event-based asynchronous pattern always returns on the thread it originates from and that thread is blocked by the call to GetEnumerator()

In the past returning on the UI thread was convenient because developers didn't have to worry about cross-thread accesses.  With Rx hopping threads is so easy that cross-thread access isn't much of a concern.

var uiContext = SyncrhonizationContext.Current; AsyncAction.Post(uiContext).Subscribe(() => Debug.Write("Now I'm back on the UI thread!"));

Unfortunately the fact that the event-based async pattern hops the the UI thread makes it much more difficult to compose a set of asynchronous operations together and block on the result.  This is why it's important to always provide a Begin/End Invoke version of your event-based, asynchronous API's.

WebClient doesn't provide a Begin/End overload for DownloadStringAsync so to work around the blocking problem we'll have to create an non-blocking OrderBy method for IObservable.

public static class ObservableExtensions { public static IObservable<T> OrderBy<T, R>(this IObservable<T> that, Func<T, R> keySelector) { return new AnonymousObservable<T>( observer => { var orderBySubject = new OrderBySubject<T, R>(keySelector); var unsubscribe = orderBySubject.Subscribe(observer); that.Subscribe(orderBySubject); return unsubscribe; }); } public class OrderBySubject<T,R> : Subject<T> { private Func<T,R> keySelector { get; set; } public OrderBySubject(Func<T, R> keySelector) { this.keySelector = keySelector; } List<T> list = new List<T>(); public override void OnNext(T value) { list.Add(value); } public override void OnCompleted() { foreach (var item in list.OrderBy(keySelector)) { base.OnNext(item); base.OnCompleted(); } } } }

Now we can rewrite the code above asynchronously.

// snip... var concatenatedHtml = pageDownloads // Order by the result index .OrderBy(pageDownload => pageDownload.ResultIndex) // Grab the HTML .Select(pageDownload => pageDownload.Html) // Concatenate the HTML strings .Aggregate((accumulatedHtml, html) => accumulatedHtml + html) // Write the first (and only) result. .Subscribe(html => Debug.Write(html));

Of course I could've avoided the blocking problem by using the WebRequest object because it has Begin/End methods.  However the point of this article was to show that although the event-based asynchronous pattern can be converted to an IObservable the abstraction leaks.

Think Ahead When Designing Asynchronous APIs

If Rx is embraced (which I expect it will be) existing event-based asynchronous APIs will be more cumbersome to work with than those written using the alternative Begin/End Invoke pattern.  I ask developers to consider this when designing asynchronous APIs today.

The Joy of Rx: Extension Events

Based on the logs it seems there is tremendous interest in Rx, the new .NET 4.0 interfaces that allow you to query events and asynchronous operations.  I can't say I'm surprised.  In my opinion Rx is the big story of .NET 4.0.  I've been developing software using Rx for months now and I thought I'd share some tips and tricks that I've picked up with the community.  You can get the Sivlerlight version of Rx from the binaries folder in the Silverlight Toolkit sources.

Converting Events to IObservables

One of the cumbersome things about Rx development is that events must be converted to IObservable's before they can be queried.  Rx provides a static FromEvent method to help with the conversion:

var mouseMove = Observable.FromEvent<MouseEventArgs>(Application.Current.RootVisual, "MouseMover"); mouseMove.Subscribe(() => Debug.WriteLine("the mouse has been moved.");

Unfortunately putting event names (or any identifiers for that matter) in strings breaks our refactoring tools.  Rx does provide another overload which accepts two actions, one that attaches the handler and another that detaches it...

class MyClass { public EventHandler<RoutedEventArgs> MyEvent; } var myClass = new MyClass(); var myClassEvent = Observable.FromEvent<RoutedEventArgs>(handler => myClass.MyEvent += handler, handler => myClass.MyEvent -= handler); mouseMove.Subscribe(() => Debug.WriteLine("the mouse has been moved.");

...but not only is this overload rather more verbose, it only works if the delegate type of your event is the generic EventHandler<T>.  Unfortunately most events use custom delegate types because in .NET 1.0 generics did not exist. 

**Updated:  There is an overload which I missed that accepts the delegate type:

var myButton = new Button();
var myButton = Observable.FromEvent<RoutedEventHandler, RoutedEventArgs>(handler => myClass.Click += handler, handler => myClass.Click -= handler);

So what's the best way of exposing events as IObservables?

Extension Events

If you're doing Rx development in C# it's good practice to create extension methods for each event you would like to query.  You can think of these methods as extension events.  To demonstrate let's create a static class with extension methods that expose the events on Silverlight/WPF's UIElement class as IObservables.

internal static class UIElementExtensions { public static IObservable<Event<MouseButtonEventArgs>> GetMouseLeftButtonDown(this UIElement that) { return Observable.FromEvent<MouseButtonEventArgs>(that, "MouseLeftButtonDown"); } public static IObservable<Event<MouseButtonEventArgs>> GetMouseLeftButtonUp(this UIElement that) { return Observable.FromEvent<MouseButtonEventArgs>(that, "MouseLeftButtonUp"); } public static IObservable<Event<MouseEventArgs>> GetMouseLeave(this UIElement that) { return Observable.FromEvent<MouseEventArgs>(that, "MouseLeave"); } public static IObservable<Event<MouseEventArgs>> GetMouseEnter(this UIElement that) { return Observable.FromEvent<MouseEventArgs>(that, "MouseEnter"); } }

Unfortunately our event names are stored in strings, but at least they are in one place.  Now that we've created extension methods that expose IObservables we can create more complex events by sequencing these primitive events.

Sequencing Events

The most exciting thing about Rx is that it enables you build complex events from a sequence of primitive events.  Let's create an observable that fires when the sequence of keys "a", "b", "c" is pressed:

// create an event that listens for key presses and returns the // key pressed. IObservable<Key> keyPress = Observable.FromEvent<KeyEventArgs>(Application.Current.RootVisual, "KeyUp") .Select(ev => ev.EventArgs.Key); // Create a helper function for creating observables that fire // when a specific key is pressed. Func<Key, IObservable<Key>> pressedIs = key => keyPress.Where(pressedKey => pressedKey == key); // Create a helper function for creating observables that fire // when a key other than a specific key is pressed. Func<Key, IObservable<Key>> pressedIsNot = key => keyPress.Where(pressedKey => pressedKey != key); IObservable<Unit> abcPressed = // Always listen for a key press "A" because it is the start of the sequence. // Each time the "A" key is pressed we being matching the sequence again. from firstKeyPressEvent in pressedIs(Key.A) // After "A" is pressed when only want to wait for a single "B" key press. // If any other key is pressed we start at the beginning and wait for "A" from secondKeyPressEvent in pressedIs(Key.B).Take(1).Until(pressedIsNot(Key.B)) // After "B" is pressed when only want to wait for a single "C" key press. // If any other key is pressed we start at the beginning and wait for "A" from thirdKeyPressEvent in pressedIs(Key.C).Take(1).Until(pressedIsNot(Key.C)) // I could return the string "abc" here but we know what exactly what keys // were pressed because this event is so specific. I really don't have anything // to return but all queries must return something. In cases like this we return the // Unit value. It's like returning null, but it has a type and can't cause a null // reference exception. select new Unit(); abcPressed.Subscribe(()=> Debug.WriteLine("ABC was pressed."));

Just as in Linq to Objects, multiple uses of the "from" keyword are translated into nested calls to the SelectMany extension method.  The abcPressed query above could also be coded this way:

IObservable<Unit> abcPressed = pressedIs(Key.A) .SelectMany( firstKeyPressEvent => pressedIs(Key.B).Take(1).Until(pressedIsNot(Key.B)) .SelectMany( secondKeyPressEvent => pressedIs(Key.C).Take(1).Until(pressedIsNot(Key.C)))) .Select(_ => new Unit());

Building a "Click" Extension Event

Now that we know how to sequence events let's use Rx to add a "Click" extension event to all instances of UIElement - a class from which all Controls inherit.  The sequence of events that constitute a click event are a little more complex than one might think.  In a nutshell we consider a click event on UIElement "A" has occurred if we match this sequence of events...

1.  MouseLeftButtonDown over UIElement "A"

2.  MouseLeftButtonUp over UIElement "A"

...or this sequence of events...

1.  MouseLeftButtonDown over UIElement "A"

2.  MouseLeave UIElement "A"

3.  MouseEnter UIElement "A"

4.  MouseLeftButtonUp over UIElement "A"

...but NOT this sequence of events:

1.  MouseLeftButtonDown over UIElement "A"

2.  MouseLeave UIElement "A"

3.  MouseEnter UIElement "B"

3.  MouseLeftButtonUp over UIElement "B"

3.  MouseLeftButtonDown over UIElement "B"

2.  MouseLeave UIElement "B"

3.  MouseEnter UIElement "A"

4.  MouseLeftButtonUp over a UIElement "A"

Let's add a GetClick extension event to the UIElementExtensions class we defined earlier.

public static IObservable<Event<MouseButtonEventArgs>> GetClick(this UIElement that) { return that // wait for any mouse left down event .GetMouseLeftButtonDown() .SelectMany( mouseLeftButtonDownEvent => // then wait for a single mouse left up event that .GetMouseLeftButtonUp() .Take(1) .Until( // We want to merge two different stop conditions... Observable.Merge( // stop listening if the mouse goes outside // the silvleright plug-in Application.Current.RootVisual.GetMouseLeave() // We return unit so that we have the // same type as the other observable // we want to merge with .Select(_ => new Unit()), // stop listening if the mouse goes outside the // element and the mouse is released. that .GetMouseLeave() .SelectMany( mouseLeaveEvent => // stop waiting for a mouse left up event // if the mouse leaves the element and the // button is released. // By listening for the event at the Root // Visual we ensure that we will get all // MouseLeftButtonUp events because this // event bubbles up. Application.Current.RootVisual .GetMouseLeftButtonUp() .Take(1) // Return unit so that we can merge .Select(_ => new Unit()) // don't cancel if the mouse enters // the element over which the mouse // was depressed. .Until(that.GetMouseEnter()))))); }

That's it!  Now every single instance of UIElement has a Click event we can subscribe to:

var rectangle = new Rectangle { Width = 100, Height = 100, Fill = new SolidColorBrush(Colors.Red) }; rectangle.GetClick().Subscribe(() => Debug.WriteLine("The rectangle was clicked.");

More Reliable Code, and Less of It

Rx allows you to write complex, asynchronous code declaratively.  If you master it you'll never have to explicitly unhook a handler from an event again.  You also wont ever have to maintain an error-prone collection of state variables in order to ascertain whether a sequence of events has occurred in a particular order.

*Edit: Here's the code!

Wednesday, July 22, 2009

Introducing Rx (Linq to Events)

It’s the most wonderful time of the year: a new version of the Silverlight Toolkit has been released alongside Silverlight 3.  This release of the Toolkit has a lot of goodies including a new TreeMap control, a Rating control (written by yours truly), and a useful collection of extensions methods for TreeView.  That said this post is not really about the Toolkit.   Buried deep in the bin folder of the Silverlight Toolkit Unit Tests is a hidden gem: The Rx Framework (System.Reactive.dll).  If you glanced quickly you’d miss it altogether but it’s one of the most exciting additions to the .NET framework since Linq.

Stating the Obvious: Asynchronous Programming is Hard

Developers tend to avoid asynchronous programming if possible because it makes our programs non-deterministic and obscure our code’s intent in a sea of callbacks.  However the truth is that asynchronous programming has become an essential part of application development.  Client apps have always needed to use asynchronous methods to keep the user interface responsive.  As a matter of fact Silverlight developers don’t have any choice in the matter because Silverlight doesn’t include any blocking IO calls.  Connected apps also need to take advantage of asynchronous programming to improve scalability.  Mashups, apps cobbled together from web services, often need to retrieve data from multiple sources in a particular order. 

Clearly we need to write asynchronous code to create modern, connected applications, but how to do it in such a way that our code remains clear and maintainable?

Introducing the Rx Framework

The IEnumerable Interface

We’re all familiar with the IEnumerable interface.  Almost every collection implements it and we use it every time we write a foreach.  Most of us are also pretty comfortable using Linq to query IEnumerables.  Linq is a series of extension methods used to manipulate sequences.  Here’s a simple example:

int[] numbers = new int[]{20,31,5,16,22};
IEnumerable<int> numbersSmallerThan20 = numbers.Where(number => number < 20);

The resulting numbersSmallerThan20 sequence looks like this when visualized:

5, 16, break

In addition to finite sequences it can sometimes be useful to create sequences that never end.  Take this method that returns an infinite sequence of integers:

IEnumerable<int> NaturalNumbers()
{
     int number = 0;
     while(true)
    {
        yield return number;
        number++;
    }
}

When visualized this sequence looks like this:

0,1,2,3,4,5,6…

Enumerable’s are sequences of data that we pull from a data source.  We don’t always pull data though.  Often it is pushed onto us and we must react appropriately.  This is called “Reactive Programming.”

Reactive Programming

Reactive programs are ubiquitous.  We use reactive programming every time we register a handler with an event or a specify a call back for an asynchronous operation.  In this example I register a handler for the mouse move event of a button.  The handler prints out the location of the mouse when it is called.

 

button.MouseMove += (o, mouseEventArgs) => Debug.Writeline(“You moved the mouse to {0}”, mouseEventArgs.GetPosition(button));


Every time the mouse event is fired our callback method is invoked and we are passed some information about the event. 

“You moved the mouse to 20,3”

“You moved the mouse to 33,12”

“You moved the mouse to 44,18”

Another form of reactive programming is running an asynchronous method and passing it a callback method to invoke when it’s finished.  This is how we keep our programs responsive during long-running operations.  Here’s an example:

DownloadFile(“http://www.jeffwilcox.com”, (byteArray) => Debug.WriteLine(“This file is {0} bytes long.”, byteArray.Length);

When this method finishes downloading a file it passes the data to our callback function giving us the following result:

“This file is 12323 bytes long.”

We’re all used to doing reactive programming by specifying methods that are called at unpredictable times, but what if there was a different way to think about reactive programming?  What if we thought of each piece of data passed to a reactively-called method as an item in a sequence?

Events and Callbacks are Sequences of Data!

The data passed to event handlers and callbacks can be thought of as sequences of data that are “pushed” at you rather than “pulled.”  Every time an event is fired we get “pushed” a new piece of data: the EventArgs.  Similarly when a callback is invoked it is typically “pushed” the result of the asynchronous method.  You can think of an event as a sequence of EventArgs that never ends just like the NaturalNumbers sequence.  If you were to visualize the mouse move event as a sequence it would look like this:

new MouseEventArgs(new Point(20,3)), new MouseEventArgs(new Point(33,12)), new MouseEventArgs(new Point(44,18))…

Similarly the asynchronous DownloadFile method can be viewed as a “push” sequence of data with only one entry:

new byte[]{23,211,33,23…}, break

The Rx team has discovered that pull sequences and push sequences are “dual.”  That is to say, any operation you can perform on “pull” sequences can also be performed on “push” sequences.  This is quite a revelation.  To put things in perspective it’s been 13 years since Design Patterns was published and we’ve only now realized that the Observable pattern and the Iterator pattern are actually the same pattern.

Although reactive “push” sequences are fundamentally the same as “pull” sequences the IEnumerable interface can’t be used for reactive sequences because it blocks.  We need a new interface with non-blocking methods that correspond exactly to the blocking methods on IEnumerable.  We need…

The IObservable/IObserver Interface

Despite the fact that they may look somewhat different on the surface the IObservable/IObserver pair of interfaces are the non-blocking equivalents of IEnumerable/IEnumerator.

clip_image001

To help you understand how they are equivalent let’s take a look at a simple example of traversing a pull sequence.  You enumerate an IEnumerable by requesting an IEnumerator.  As you call MoveNext the IEnumerator “pulls” data from the IEnumerable, usually by invoking its methods.

IEnumerator<int> numberEnumerator = new int[]{1,2,3};
while(numberEnumerator.MoveNext())
{
     Debug.WriteLine(“{0}”, numberEnumerator.Current);
}

Debug.WriteLine(“all done.”);

This prints:

1
2
3
all done.

To traverse an IObservable you go through the same actions as an IEnumerable but in reverse.  You create an IObserver, give it to an IObservable, and the IObservable “pushes” data into the IObserver by invoking its methods.  When an IObservable invokes the “OnNext” method on an Observer it is equivalent to an IEnumerable method using the yield keyword to give information to an IEnumerable.  Similarly when an IObservable invokes the “OnCompleted” method on an Observer it is equivalent to an IEnumerable using the break keyword to indicate that there is no more data.

Let’s define a NumbersObserver which converts a “pull” sequence of numbers into a “push” sequence as well as an observer that listens to our NumbersObserver and prints its contents.

 

internal class AnonymousDisposable : IDisposable
{

    internal Action Action {get; internal set;}

    void IDisposable.Dispose()
    {
        this.Action();
    }
}

class NumbersObservable : IObservable<int>
{
    public NumbersObservable(IEnumerable<int> numbers)
    {
        this._numbers = numbers;
    }

    private IEnumerable<int> _numbers;

    public IDisposable Subscribe(IObserver<int> observer)
    {
         foreach(int number in _numbers)
        {
             observer.OnNext(number);
        }
        observer.OnCompleted();

        return new AnonymousDisposable { Action = () => { ; // do nothing because we’ve already called OnCompleted() } }; 
    }
}

class DebugObserver : IObserver<int>
{
    public void OnNext(int value)
    {
        Debug.WriteLine(“{0}”, value);
    }

    public void OnCompleted()
    {
        Debug.WriteLine(“all done.”);
    }

    public void OnError(Exception ex)
    {
        Debug.WriteLine(“Whoops exception, I’d better throw.”)
        throw ex;
    }
}

Now let’s use these classes to create a “push” version of our “pull” example:

new NumbersObservable(new[]{2,3,4}).Register(new DebugObserver());

This prints…

1
2
3
all done.

“Whoa, whoa!  Isn’t this pretty complicated?”

Don’t worry.  This example above is just to demonstrate the interplay between the interfaces.  Most of the time you won’t have to implement your own Observable or Observer.  Rx includes lots of methods for constructing observables and observers.  Using Rx extension methods I can rewrite the code above like so:

new[]{1,2,3}.ToObservable().Subscribe(number => Debug.WriteLine(“{0}”, number));

“What’s with the IDisposable object?”

The IDisposable object is returned by an Observable when you register an observer with it.  When you invoke the Dispose method on the registration object the observer will stop listening to the observable for data.  This is the active equivalent of passively not calling MoveNext() anymore in the middle of a sequence.  Rather than invoke the Dispose method directly you will most often have it invoked for you by Rx.  In the following example the TakeWhile method will invoke Dispose under the hood to detach from an observable as soon as a number larger than 10 is returned.

var numbersSmallerThanTen = Enumerable.Range(0,100).ToObservable().TakeWhile(x => x <= 10);

Here’s an example of invoking Dispose explicitly to detach from an event which has been converted to an Observable:

// Use an Rx method to convert an event to an Observable
IObservable<Event<MouseEventArgs>> mouseMoveEventObservable = Observable.FromEvent<MouseEventArgs>(myControl, “MouseMove”);

// register a handler with the event using an overload that accepts a lambda instead of an Observer
IDisposable registration = mouseMoveEventObservable.Subscribe(mouseMoveEvent => Debug.Write(“The mouse was moved.”));

// stop listening to the event
registration.Dispose();

“What about the OnError method?  I don’t see an equivalent for that in IEnumerable or IEnumerator either.”

When errors occur in asynchronous operations the exception must be passed to the callback method so that the callback method can handle it.  That’s why this method exists in IObservable but seems to have no equivalent in IEnumerable.  In IEnumerable it is implicit because you can use try/catch.

Linq to IObservable

Now that we understand that an IObservable is just a “push” version of IEnumerable it just be obvious that all of the familiar Linq methods apply to it.  In fact it is equally appropriate to use query syntax on “pull” sequences and “push” sequences.  Both are queries in the strictest sense and the fact that a sequence is push or pull is orthogonal.  Let’s analyze a typical Linq query:

IEnumerable<Point> points =
     from x in Enumerable.Range(0, 2)
     from y in Enumerable.Range(0, 2)
     select new Point(x,y);

A verbal description of this query might be:

“For each x in the sequence [0 to 1] get each y in the sequence [0 to 1] and create a new point for each pair of values.”

The result of course is:

0,0
0,1
1,0
1,1

Now let’s contrast this with an Rx query that creates a dragging event for a Silverlight/WPF control:

IObservable<Event<MouseEventArgs>> draggingEvent =
     from mouseLeftDownEvent in control.GetMouseLeftDown()
     from mouseMoveEvent in control.GetMouseMove().Until(control.GetMouseLeftUp())
     select mouseMoveEvent;

A verbal description of this query might be:

“For each mouse left down event, get each mouse move event and return it until the next mouse left up event occurs.”

As you can see, using from” allows us to declaratively sequence events.  The alternative would be to create a state machine, setting a flag when the mouse button is pressed and then behaving differently when the mouse is moved and that flag is set.  With Rx the code for the drag event is self-contained and involves no variable mutation.

With Linq to IEnumerable we transform and combine sequences of data to create a sequence containing exactly the data we need.  Then we traverse that sequence and do something with the data.  With  Linq to IObservable we can transform and combine events and async callbacks to create the precise event we’re interested in.  Then we register a handler and do something with the data.

Silverlight Toolkit Unit Test Code Written with Rx

The Silverlight Toolkit team is using Rx to write reliable, event-based asynchronous tests.  This is essential as the elements in a control’s visual tree are created asynchronously, forcing us to wait for an event in order to confirm they were created appropriately. Let’s take a look at a test for Rating that uses Rx.

This test ensures that the Actual Value of a RatingItem is %100 when its parent Rating is %100.  This is tricky because when you change the value of rating it animates to the new value using an internal storyboard.  I have to wait for ActualValue to be animated to the Value property before I examine the RatingItem.

The test creates a Rating control, places it on screen, and waits for LayoutUpdated to ensure that the rating items are generated.  Then it asynchronously sets the value of rating to 1.0.  As the ActualValue of the Rating changes, the various RatingItems will have their ActualValue’s set accordingly depending on their index.  The test needs to wait until the Rating’s ActualValue reaches Value before checking to make sure the ActualValue of the last rating item is 1.0.

 

Rating rating = new Rating();
IObservable<Unit> test =                             // Unit is an object that represents null.
    ObservableExtensions
        .DoAsync(() => TestPanel.Children.Add(rating))
        .WaitFor(TestPanel.GetLayoutUpdated())     // Extension method GetLayoutUpdated converts the event to observable
        .DoAsync(() => rating.Value = 1.0)        // Calls the Ignite EnqueueCallback method
        .WaitFor(                                 // waits for an observable to raise before going on
            // listen to all the actual value change events and filters them until ActualValue reaches Value
            rating                          
            .GetActualValueChanged()        // extension method that converts ActualValueChanged event to IObservable
            .SkipWhile(actualValueChangedEvent => actualValueChangedEvent.EventArgs.NewValue != rating.Value))
        // check to make sure the actual value of the rating item is set appropriately now that the animation has completed
        .Assert(() => rating.GetRatingItems().Last().ActualValue == 1.0) // crawls the expression tree and makes a call to the appropriate Assert method

Test.Subscribe(() => TestPanel.Children.Remove(rating));    //run the test and clean up at the end.

The code above uses a variety of extension methods we built to manipulate observable objects.  You can use these libraries in your own unit tests by downloading the Silverlight Toolkit sources.

Always Useing IObservable for New Asynchronous APIs

The IObservable/IObserver interfaces are in .NET framework 4.0I want to stress that IObservable is the new asynchronous programming pattern in .NET. It supplants the Begin/EndInvoke pattern as well as the event-based asynchronous patternSimple run of thumb: if the method is asynchronous, return an IObservable. *

*A correction here.  It is still perfectly acceptable to use Begin/End Invoke or the event-based asynchronous pattern.  Large portions of the framework use these patterns and will continue to do so for the sake of consistency.

Exposing IObservable is like putting lighting in a bottle.  Developers can open it up and get access to a galaxy of Linq methods they can use to combine and sequence them with other IObservables.  My hope is that eventually API’s exposing IObservable will be just as common as those exposing IEnumerable.

Erik Meijer Strikes Again

Rx is the brainchild of Erik Meijer, the father of Linq and recent recipient of the Outstanding Technical Leadership award at Microsoft.  Erik is the reason I chose to work for Microsoft.  With the introduction of Rx and his work on Linq and Haskell he has profoundly changed the way I approach software development twice in four years – an incredible feat.  Thanks to Meijer, Microsoft does a better job than anyone of taking bleeding-edge functional programming research and productizing it.  The Rx team also includes my favorite blogger, Wes Dyer whose blog posts opened my eyes to what an incredibly versatile language C# is.

After using it for the last few months it’s now impossible to imagine doing Silverlight development without Rx.  As of today the only place you can get it is in the Silverlight Toolkit sources.  Take a look.  If you are comfortable with Linq programming you’ll find it to be extremely powerful.

About Me

My photo
I'm a software developer who started programming at age 16 and never saw any reason to stop. I'm working on the Presentation Platform Controls team at Microsoft. My primary interests are functional programming, and Rich Internet Applications.