In part 1 we talked about how to convert events to IObservables. However Rx isn't just about querying events, it's also about querying asynchronous operations.
There are two common asynchronous patterns in the .NET framework: the event-based asynchronous pattern and Begin/EndInvoke. Currently MSDN recommends using the event-based asynchronous pattern where possible. This recommendation may or may not change when IObservable is released with .NET 4.0. I want to make clear that the opinions expressed in this post are my own personal views and are not consistent with official MSDN guidance.
Don't use the event-based asynchronous pattern. Or if you must use the event-based asynchronous pattern be sure to also provide a Begin/EndInvoke version of the API that uses IAsyncResult. Why? Because the event-based asynchronous pattern is deeply flawed. To understand why let's examine a typical piece of code that uses the event-based asynchronous pattern.
Guid token = Guid.NewGuid(); var webClient = new WebClient(); // We need to refer to the identifier within the body // of the method so we must first initialize it to null. // This means we can't use type inference to avoid // having to clutter our code with this absurdly long // (and entirely unnecessary) delegate type. DownloadStringCompletedEventHandler handler = null; handler = (o, a) => { if (((Guid)a.UserState) != token) return; // unhook from the event so that we don't keep firing // after we've gotten our data asynchronously. webClient.DownloadStringCompleted -= handler; if (a.Error != null) { // Handle exception. This may include throwing but // we may also have to invent some method of manually // propagating it if we don't have the knowledge to // handle it at this point and we are in the middle of several //other asynchronous operations. } if (!a.Cancelled) { Debug.WriteLine(string.Format("The downloaded HTML is {0}.", a.Result)); } } // Notice that the only link between the method and the event that returns // its data is a simple naming convention. There's no way to know for _sure_ // which event will return a method's data. client.DownloadStringCompleted += handler; client.DownloadStringAsync(new Uri("http://www.jeffwilcox.com"), token);
A thoroughly awful piece of code no? The good news is that by adding an extension event to the WebClient class we can wrap the event-based asynchronous pattern in an IObservable. Using Rx works around all of the the issues pointed out in the comments above.
Of course it's rather cumbersome to create a new class that inherits from IObservable whenever we need to return the result of an asynchronous operation. It also seems a little silly given that IObservable only has one method: Subscribe. Given this fact it's helpful to create an AnonymousObservable class which accepts an action and invokes it when the Subscribe method is called.
internal class AnonymousObservable<T> : IObservable<T> { private Func<IObserver<T>, IDisposable> subscribeAction; public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribeAction) { this.subscribeAction = subscribeAction; } public IDisposable Subscribe(IObserver<T> observer) { return subscribeAction(observer); } }
Now that we've got our handy, reusable AnonymousObservable class we're ready to create our GetDownloadString extension event.
public static class WebClientExtensions { // No need to put Async in the method title, it's implicit given that // an observable is being returned. public static IObservable<string> GetDownloadString(this WebClient client, Uri address) { // Delay action by nesting it in an observable. // Nothing should ever happen until a client subscribes to the // observable, just as nothing should happen until an // IEnumerable is traversed. return new AnonymousObservable<string>( observer => { // Several downloads may be going on simultaneously. // The token allows us to establish that we're retrieving // the right one. Guid token = Guid.NewGuid(); var stringDownloaded = Observable.FromEvent<DownloadStringCompletedEventArgs>(client, "DownloadStringCompleted") // Confirm its our download using captured state variable .Where(evt => ((Guid)evt.EventArgs.UserState) == token) .Take(1); //implicitly unhooks handler after event is received bool errorOccurred = false; // Subscribe to the IObservable. Under the hood Rx // creates an anonymous observer class that invokes // the three actions passed to the Subscribe method. IDisposable unsubscribe = stringDownloaded.Subscribe( // OnNext action ev => { // Propagate the exception if one is // reported. After this, all observers // will continue to propagate this // exception via the OnError method call // until it is caught. if (ev.EventArgs.Error != null) { errorOccurred = true; observer.OnError(ev.EventArgs.Error); } else if (!ev.EventArgs.Cancelled) { observer.OnNext(ev.EventArgs.Result); } }, // OnError action (propagate exception) ex => observer.OnError(ex), // OnCompleted action () => { // No need to call OnCompleted if // there has been an exception. // It is implicit that there will // be no more data. if (!errorOccurred) { observer.OnCompleted(); } }); client.DownloadStringAsync(address, token); return unsubscribe; }); } }
Granted converting the event-based asynchronous pattern to IObservable requires a little bit of code, but the complexity is pushed onto the library developer. Look at how easy it is to consume the IObservable API.
var webClient = new WebClient(); webClient .GetDownloadString(new Uri("http://www.jeffwilcox.com")) .Subscribe(html => Debug.WriteLine(html));
No need to fuss with event argument objects. The IObservable/IObserver objects take on the concerns of cancelling and propagating exceptions. The developer is given exactly what he or she expects: an asynchronously downloaded string containing the contents of a web page.
Building Complex Queries by Combining Asynchronous Operations
Now that we've got our handy GetDownloadString extension method we can build some pretty complex queries with it. Let's use it to download the HTML from the first ten results pages of a Bing search at once.
public static class EnumerableExtensions { // This is a very handy extension method missing from Linq. public static IEnumerable<T> Iterate<T>(T initalValue, Func<T, T> next) { yield return initalValue; while (true) { initalValue = next(initalValue); yield return initalValue; } } } // snip... var client = new WebClient(); var uri = "http://www.bing.com/search?q=Rx+framework&first={0}&FORM=PERE3"; var pageSize = 10; var resultIndices = EnumerableExtensions.Iterate(1, prev => prev + pageSize).Take(10); var pageDownloads = resultIndices .Select( resultIndex => client .GetDownloadString(new Uri(string.Format(uri, resultIndex))) // append the result index to the results so we can sort by it later .Select(html => new { ResultIndex = resultIndex, Html = html })) // Merge all of the observables into one so that we can subscribe to all of // them simultaneously. .Aggregate( (simultaneousDownloads, currentDownload) => Observable.Merge(simultaneousDownloads, currentDownload)); var concatenatedHtml = pageDownloads // Convert to a blocking enumerable. It makes no sense for // IObservable to have a GroupBy or OrderBy method because // both these methods must block until all the data is received. // However all of the downloads will begin simultaneously when // we traverse the IEnumerable .ToEnumerable() // Order by the result index .OrderBy(pageDownload => pageDownload.ResultIndex) // Grab the HTML .Select(pageDownload => pageDownload.Html) // Concatenate the HTML strings .Aggregate((accumulatedHtml, html) => accumulatedHtml + html) // Grab the first (and only) result. .First();
Pretty slick right? There's only one problem. This code doesn't work :-(. It will block forever. The problem is that the event-based asynchronous pattern always returns on the thread it originates from and that thread is blocked by the call to GetEnumerator().
In the past returning on the UI thread was convenient because developers didn't have to worry about cross-thread accesses. With Rx hopping threads is so easy that cross-thread access isn't much of a concern.
var uiContext = SyncrhonizationContext.Current; AsyncAction.Post(uiContext).Subscribe(() => Debug.Write("Now I'm back on the UI thread!"));
Unfortunately the fact that the event-based async pattern hops the the UI thread makes it much more difficult to compose a set of asynchronous operations together and block on the result. This is why it's important to always provide a Begin/End Invoke version of your event-based, asynchronous API's.
WebClient doesn't provide a Begin/End overload for DownloadStringAsync so to work around the blocking problem we'll have to create an non-blocking OrderBy method for IObservable.
public static class ObservableExtensions { public static IObservable<T> OrderBy<T, R>(this IObservable<T> that, Func<T, R> keySelector) { return new AnonymousObservable<T>( observer => { var orderBySubject = new OrderBySubject<T, R>(keySelector); var unsubscribe = orderBySubject.Subscribe(observer); that.Subscribe(orderBySubject); return unsubscribe; }); } public class OrderBySubject<T,R> : Subject<T> { private Func<T,R> keySelector { get; set; } public OrderBySubject(Func<T, R> keySelector) { this.keySelector = keySelector; } List<T> list = new List<T>(); public override void OnNext(T value) { list.Add(value); } public override void OnCompleted() { foreach (var item in list.OrderBy(keySelector)) { base.OnNext(item); base.OnCompleted(); } } } }
Now we can rewrite the code above asynchronously.
// snip... var concatenatedHtml = pageDownloads // Order by the result index .OrderBy(pageDownload => pageDownload.ResultIndex) // Grab the HTML .Select(pageDownload => pageDownload.Html) // Concatenate the HTML strings .Aggregate((accumulatedHtml, html) => accumulatedHtml + html) // Write the first (and only) result. .Subscribe(html => Debug.Write(html));
Of course I could've avoided the blocking problem by using the WebRequest object because it has Begin/End methods. However the point of this article was to show that although the event-based asynchronous pattern can be converted to an IObservable the abstraction leaks.
Think Ahead When Designing Asynchronous APIs
If Rx is embraced (which I expect it will be) existing event-based asynchronous APIs will be more cumbersome to work with than those written using the alternative Begin/End Invoke pattern. I ask developers to consider this when designing asynchronous APIs today.