In part 1 we talked about how to convert events to IObservables. However Rx isn't just about querying events, it's also about querying asynchronous operations.
There are two common asynchronous patterns in the .NET framework: the event-based asynchronous pattern and Begin/EndInvoke. Currently MSDN recommends using the event-based asynchronous pattern where possible. This recommendation may or may not change when IObservable is released with .NET 4.0. I want to make clear that the opinions expressed in this post are my own personal views and are not consistent with official MSDN guidance.
Don't use the event-based asynchronous pattern. Or if you must use the event-based asynchronous pattern be sure to also provide a Begin/EndInvoke version of the API that uses IAsyncResult. Why? Because the event-based asynchronous pattern is deeply flawed. To understand why let's examine a typical piece of code that uses the event-based asynchronous pattern.
Guid token = Guid.NewGuid(); var webClient = new WebClient(); // We need to refer to the identifier within the body // of the method so we must first initialize it to null. // This means we can't use type inference to avoid // having to clutter our code with this absurdly long // (and entirely unnecessary) delegate type. DownloadStringCompletedEventHandler handler = null; handler = (o, a) => { if (((Guid)a.UserState) != token) return; // unhook from the event so that we don't keep firing // after we've gotten our data asynchronously. webClient.DownloadStringCompleted -= handler; if (a.Error != null) { // Handle exception. This may include throwing but // we may also have to invent some method of manually // propagating it if we don't have the knowledge to // handle it at this point and we are in the middle of several //other asynchronous operations. } if (!a.Cancelled) { Debug.WriteLine(string.Format("The downloaded HTML is {0}.", a.Result)); } } // Notice that the only link between the method and the event that returns // its data is a simple naming convention. There's no way to know for _sure_ // which event will return a method's data. client.DownloadStringCompleted += handler; client.DownloadStringAsync(new Uri("http://www.jeffwilcox.com"), token);
A thoroughly awful piece of code no? The good news is that by adding an extension event to the WebClient class we can wrap the event-based asynchronous pattern in an IObservable. Using Rx works around all of the the issues pointed out in the comments above.
Of course it's rather cumbersome to create a new class that inherits from IObservable whenever we need to return the result of an asynchronous operation. It also seems a little silly given that IObservable only has one method: Subscribe. Given this fact it's helpful to create an AnonymousObservable class which accepts an action and invokes it when the Subscribe method is called.
internal class AnonymousObservable<T> : IObservable<T> { private Func<IObserver<T>, IDisposable> subscribeAction; public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribeAction) { this.subscribeAction = subscribeAction; } public IDisposable Subscribe(IObserver<T> observer) { return subscribeAction(observer); } }
Now that we've got our handy, reusable AnonymousObservable class we're ready to create our GetDownloadString extension event.
public static class WebClientExtensions { // No need to put Async in the method title, it's implicit given that // an observable is being returned. public static IObservable<string> GetDownloadString(this WebClient client, Uri address) { // Delay action by nesting it in an observable. // Nothing should ever happen until a client subscribes to the // observable, just as nothing should happen until an // IEnumerable is traversed. return new AnonymousObservable<string>( observer => { // Several downloads may be going on simultaneously. // The token allows us to establish that we're retrieving // the right one. Guid token = Guid.NewGuid(); var stringDownloaded = Observable.FromEvent<DownloadStringCompletedEventArgs>(client, "DownloadStringCompleted") // Confirm its our download using captured state variable .Where(evt => ((Guid)evt.EventArgs.UserState) == token) .Take(1); //implicitly unhooks handler after event is received bool errorOccurred = false; // Subscribe to the IObservable. Under the hood Rx // creates an anonymous observer class that invokes // the three actions passed to the Subscribe method. IDisposable unsubscribe = stringDownloaded.Subscribe( // OnNext action ev => { // Propagate the exception if one is // reported. After this, all observers // will continue to propagate this // exception via the OnError method call // until it is caught. if (ev.EventArgs.Error != null) { errorOccurred = true; observer.OnError(ev.EventArgs.Error); } else if (!ev.EventArgs.Cancelled) { observer.OnNext(ev.EventArgs.Result); } }, // OnError action (propagate exception) ex => observer.OnError(ex), // OnCompleted action () => { // No need to call OnCompleted if // there has been an exception. // It is implicit that there will // be no more data. if (!errorOccurred) { observer.OnCompleted(); } }); client.DownloadStringAsync(address, token); return unsubscribe; }); } }
Granted converting the event-based asynchronous pattern to IObservable requires a little bit of code, but the complexity is pushed onto the library developer. Look at how easy it is to consume the IObservable API.
var webClient = new WebClient(); webClient .GetDownloadString(new Uri("http://www.jeffwilcox.com")) .Subscribe(html => Debug.WriteLine(html));
No need to fuss with event argument objects. The IObservable/IObserver objects take on the concerns of cancelling and propagating exceptions. The developer is given exactly what he or she expects: an asynchronously downloaded string containing the contents of a web page.
Building Complex Queries by Combining Asynchronous Operations
Now that we've got our handy GetDownloadString extension method we can build some pretty complex queries with it. Let's use it to download the HTML from the first ten results pages of a Bing search at once.
public static class EnumerableExtensions { // This is a very handy extension method missing from Linq. public static IEnumerable<T> Iterate<T>(T initalValue, Func<T, T> next) { yield return initalValue; while (true) { initalValue = next(initalValue); yield return initalValue; } } } // snip... var client = new WebClient(); var uri = "http://www.bing.com/search?q=Rx+framework&first={0}&FORM=PERE3"; var pageSize = 10; var resultIndices = EnumerableExtensions.Iterate(1, prev => prev + pageSize).Take(10); var pageDownloads = resultIndices .Select( resultIndex => client .GetDownloadString(new Uri(string.Format(uri, resultIndex))) // append the result index to the results so we can sort by it later .Select(html => new { ResultIndex = resultIndex, Html = html })) // Merge all of the observables into one so that we can subscribe to all of // them simultaneously. .Aggregate( (simultaneousDownloads, currentDownload) => Observable.Merge(simultaneousDownloads, currentDownload)); var concatenatedHtml = pageDownloads // Convert to a blocking enumerable. It makes no sense for // IObservable to have a GroupBy or OrderBy method because // both these methods must block until all the data is received. // However all of the downloads will begin simultaneously when // we traverse the IEnumerable .ToEnumerable() // Order by the result index .OrderBy(pageDownload => pageDownload.ResultIndex) // Grab the HTML .Select(pageDownload => pageDownload.Html) // Concatenate the HTML strings .Aggregate((accumulatedHtml, html) => accumulatedHtml + html) // Grab the first (and only) result. .First();
Pretty slick right? There's only one problem. This code doesn't work :-(. It will block forever. The problem is that the event-based asynchronous pattern always returns on the thread it originates from and that thread is blocked by the call to GetEnumerator().
In the past returning on the UI thread was convenient because developers didn't have to worry about cross-thread accesses. With Rx hopping threads is so easy that cross-thread access isn't much of a concern.
var uiContext = SyncrhonizationContext.Current; AsyncAction.Post(uiContext).Subscribe(() => Debug.Write("Now I'm back on the UI thread!"));
Unfortunately the fact that the event-based async pattern hops the the UI thread makes it much more difficult to compose a set of asynchronous operations together and block on the result. This is why it's important to always provide a Begin/End Invoke version of your event-based, asynchronous API's.
WebClient doesn't provide a Begin/End overload for DownloadStringAsync so to work around the blocking problem we'll have to create an non-blocking OrderBy method for IObservable.
public static class ObservableExtensions { public static IObservable<T> OrderBy<T, R>(this IObservable<T> that, Func<T, R> keySelector) { return new AnonymousObservable<T>( observer => { var orderBySubject = new OrderBySubject<T, R>(keySelector); var unsubscribe = orderBySubject.Subscribe(observer); that.Subscribe(orderBySubject); return unsubscribe; }); } public class OrderBySubject<T,R> : Subject<T> { private Func<T,R> keySelector { get; set; } public OrderBySubject(Func<T, R> keySelector) { this.keySelector = keySelector; } List<T> list = new List<T>(); public override void OnNext(T value) { list.Add(value); } public override void OnCompleted() { foreach (var item in list.OrderBy(keySelector)) { base.OnNext(item); base.OnCompleted(); } } } }
Now we can rewrite the code above asynchronously.
// snip... var concatenatedHtml = pageDownloads // Order by the result index .OrderBy(pageDownload => pageDownload.ResultIndex) // Grab the HTML .Select(pageDownload => pageDownload.Html) // Concatenate the HTML strings .Aggregate((accumulatedHtml, html) => accumulatedHtml + html) // Write the first (and only) result. .Subscribe(html => Debug.Write(html));
Of course I could've avoided the blocking problem by using the WebRequest object because it has Begin/End methods. However the point of this article was to show that although the event-based asynchronous pattern can be converted to an IObservable the abstraction leaks.
Think Ahead When Designing Asynchronous APIs
If Rx is embraced (which I expect it will be) existing event-based asynchronous APIs will be more cumbersome to work with than those written using the alternative Begin/End Invoke pattern. I ask developers to consider this when designing asynchronous APIs today.
 

13 comments:
Am I the only one who finds this code extremely convoluted? I just hope that if something new is being promoted that it really is an improvement over the current method and that it's easy to use. You guys really seem to be getting gung ho with the extension methods. I thought the recommendation was to use them sparingly. Also, you seem to be using var all over the place which as far as I knew was frowned upon from a code readability perspective. Also, the new way of doing it takes a lot more code. Note, there will be developers out there developing libraries also, not just making use of them. My initial reaction to this is that it's confusing as hell.
i agree that AnonomousObservable is useful :) its actually in the dll but for some reason its marked as internal! why oh why is this? is like with TreeVisitorBase. an extremly useful class that is made internal so we have to write the same (in this case) trivial code over and over again for no reason :( there are a number of other classes in system.reactive that are also marked as internal... :( that is just so annoying.. please change that before rtm.
other than tath Rx is really awsome :)
btw your iterate method is nice but it'd be even better is it looked like this i think :)
IEnumerable< R > IterateFrom< T , R >(this T, Func< T , R > Next)
Hi Jon,
Do you find Linq code convoluted and unreadable? Keep in mind that the extension methods on IObservable are basically the same as those that already exist on IEnumerable. If I were computing streams of data synchronously the code would look almost identical.
Your criticism of my var usage is fair enough. I should probably stick to explicit type declarations in blog posts for clarity. I don't know that there's any specific framework guidance on this though.
As for your claim that Rx requires more code I strongly disagree. An async API written from scratch is more concise to write in Rx than with the EBAP. Additionally any code savings you do get from the EBAP are simply pushed onto the library consumer.
My suspicion is that you are new to functional programming in general. I had a similar reaction to queries when I was first introduced to them. There is nothing inherently convoluted about them (in fact quite the contrary), I'm guessing they are simply not structured the way you are used to.
Hi aL.
I don't own the Rx framework and even if I did AnonymousObservable does not sound like something I'd expose. It's easy to write and it has the danger of being seen as prescriptive.
Don't get me wrong. I sympathize with frustration about internals. They are a necessary evil though. Copy/paste is your friend :-).
As for your Iterate method...I have a sneaking suspicion it won't work :-). That's one of the wonderful things about static typing, you can tell a lot about what a function can do (or not do in this case) simply by examining its type. ;-) Iterate, like most Linq combinators, is actually stolen from Haskell. Not sure why it didn't make it into System.Linq though.
Hello, Jafar -
First off, I agree that EBAP components should also expose an IAsyncResult API if possible. I teach asynchronous component design, and I've always advocated this.
However, two other solutions spring immediately to my mind regarding this problem.
The first is to use a separate thread with its own SyncContext. This is not uncommon, since it is necessary when using EBAPs on child threads, in console programs, or windows services (and also in some parent/child scenarios, where an EBAP depends on other EBAPs).
The second is simpler: use a temporary substitute default SyncContext (using IDisposable to reset the original SyncContext). However, this does not work as well if anything more complex is done when evaluating the enumerator (e.g., updating a GUI element).
-Steve
(Boy this takes me back to learning lambda calculus at uni!)
It sure is a mind shift to get to functional programming from where I am now.
It's going to be a challenge, but I suspect you're right; this is too powerful a tool to ignore and I think the take-up will be slow but huge.
Carl.
@Jon - It does start out more convoluted and a bit more complicated.
However I have found that the way it is set up is seriously generic.
I converted David Betz Silverlight 2 WCF client to using this and it is not that much more code, but using it is much easier and straight forward.
When I was first converting it I felt the same as you did "What huh, why?" but once I finished it and realized what was going on, making a mock version for unit testing was trivial since so much of the setup code is shared from implementation to implementation.
I don't know if you knew it or if I misanderstood something, but if AnonymousObservable is in fact internal, you can easily get an instance of it using the static factory method System.Linq.Observable.Create(subscribe)
Furthermore, I would say that an non blocking version of the OrderBy function is a lot more interesting than a blocking one ^_^
Very interesting post!
I tend to agree with Jon, and it is because I have not been able to embrace LINQ 100%. While I actually enjoyed watching Erik Meijer's Channel 9 video on the dualism of Enumerable and Observable, you still have to have drunk the cool aid, so to speak, to be so enthusiastic. As I see it, C# is a pretty elegant language, and LINQ is a new language within it which is far less elegant. It was wedged in there to accomplish the task of making type safe calls to SQL. I remember coding in C in the 80s, and when C++ came along, it was, like, ah-ha! That is what I was looking for! This is not happening with LINQ for me. I think it is because of the heavy dependence on lambda expressions. You're looking at a pseudo select statement, but instead of selecting stuff, your firing off code (and it is hard to put your finger on when). If I am reading classic C# with defined delegates, I know what is happening and when it will happen. I know where to set my breakpoints, and no (or very little) code is firing underneath me.
This is now compounded with the burying of reactive programming into the same language. We have been coding reactively since the Mac came out. Sure it was hard at first (especially coming from DOS), but over time we got it to make sense (for example, with a design surface and event processing). You can hand an entry level programmer some code, and they can make sense of it. Now all of a sudden we need a way to stuff all of it into one line of code. And your examples only do Debug.WriteLines. Imagine when you have to do something hard.
I would be interested in seeing how Jon has come along in the last 6 months, however.
Hi Jafar,
Should the base.OnCompleted(); in the OrderBySubject class be outside the for loop???
Thank You,
Vish
Post a Comment