Saturday, August 1, 2009

The Joy of Rx: Building an Asynchronous API with Rx and IQueryable (Updated)

Updated: One of our architects expressed concerns about my original solution to this problem because it required a down-cast to IObservable.  In the process of explaining to him why this was unavoidable I figured out that it was, in fact, avoidable. :-) I've since updated this article with a much more elegant solution.

The IObservable interface is a true Microsoft innovation and a very significant discovery.  Although we've known for a long time that iteration could be done either via push or pull we hadn't figured out how to do push iteration in a composable way until IObservable came along.  It's interesting to speculate about how the .NET framework might've been designed differently if IObservable had been present alongside IEnumerable in .NET 1.0.  Here are a few questions that haunt me...

  1. Rather than introduce the redundant concept of an event into the CLR would we have used properties of type IObservable instead?
  2. Would all asynchronous API's return an IObservable rather than use one of any number of other patterns.
  3. Would the "yield" keyword in C# work for methods that return IObservable as well as those that return IEnumerable?
  4. Would the ItemsSource property of the Silverlight/WPF ItemsControl (which most data controls derive from or are modeled after) be of type IObservable (or object) rather than IEnumerable?

There's no way of knowing for sure but I like to think the answer to these questions is "yes."  There is yet another question worth asking now with the benefit of hindsight:

If IObservable had been in BCL all along would IQueryable inherit from IObservable as well as IEnumerable (or neither)?

Probably.  The most common use of IQueryable is to execute query logic on a remote machine.  Because they take a relatively large amount of time it makes sense to make remote calls asynchronously and expose a non-blocking interface.  Ideally IQueryable wouldn't inherit from either IObservable or IEnumerable.  Instead it probably should've had its own implementation of the query operators and provided two conversion methods: ToEnumerable and ToObservable.  Unfortunately this would be a breaking change at this point and is therefore highly unlikely to happen.

"Okay, okay.  You're bumming me out.  How can I traverse an IQueryable asynchronously?"

Before we take a look at an example it might be helpful to have a look at the first part of the excellent "Building a Linq IQueryable Provider" series on the Wayward Weblog.  I'm going to use the code in that article as a starting point.

Asynchronously Querying Yahoo Finance

Yahoo Finance has a REST service that returns stock history in a CSV file.  We'd like to be able to query for stocks using an API like this:

YahooFinance service = new YahooFinance(); IQueryable stockInfos = from stock in service.Stocks where stock.Name == "MSFT" where stock.Date > new DateTime(1999, 1, 1) select stock;

First we'll create a simple C# class which will wrap the REST service and expose an asynchronous API. To build it we'll use the GetDownloadString extension method we introduced to WebClient in my last Rx post.

public class YahooFinanceStockService { private WebClient client = new WebClient(); const string uriFormat = "{0}&a={1}b={2}&c={3}&d={4}&e={5}&f={6}&g=d&ignore=.csv"; public IObservable<StockInfo> GetStockInfo(string name, DateTime from, DateTime to) { return client.GetDownloadString( new Uri( string.Format( uriFormat, name, from.Day, from.Month, from.Year, to.Day, to.Month, to.Year))) .SelectMany( text => ( // retrieve each line in the CSV file from line in text // remove trailing line break at end of file .Trim() .Split('\n') // skip header line .Skip(1) let fields = line.Split(',') select new StockInfo { Name = name, Date = DateTime.Parse(fields[0]), Open = double.Parse(fields[1]), High = double.Parse(fields[2]), Low = double.Parse(fields[3]), Close = double.Parse(fields[4]), Volume = int.Parse(fields[5]), AdjustedClose = double.Parse(fields[6]) } ).ToObservable()); } }

Next we'll create a YahooFinanceQueryProvider that derives from the QueryProvider class introduced in the Wayward Weblog series.  The query provider interprets the query expression, makes one or more calls to the service's GetStockInfo method, and returns a single IObservable when its Execute method is called.

public class YahooFinanceStocksQueryProvider : QueryProvider { private YahooFinanceStockService service = new YahooFinanceStockService(); public override string GetQueryText(Expression expression) { return "YahooFinance.Stocks"; } public override object Execute(Expression expression) { /* Normally this would contain generic logic to * parse the query and make the appropriate call to the * service. That's outside of the scope of this article so * we'll just make a call to the service with constant data. */ return service.GetStockInfo("MSFT", new DateTime(1999, 1, 1), DateTime.Now); } }

Now that we have a query provider let's create a query object that we can expose.  We'll start with the Query class from the Webward Weblog series but we'll make a few small changes:

  1. We'll rename the class from Query to ObservableQuery.
  2. In addition to IQueryable our ObservableQuery class will also implement IObservable.
  3. If the ObservableQuery object is traversed synchronously (foreach) we'll convert the IObservable object returned by the query provider to an IEnumerable.

public class ObservableQuery<T> : IQueryable<T>, IQueryable, IEnumerable<T>, IEnumerable, IOrderedQueryable<T>, IOrderedQueryable, IObservable<T> { internal QueryProvider provider; internal Expression expression; // snip... public IEnumerator<T> GetEnumerator() { return ((IObservable<T>)this.provider.Execute(this.expression)).ToEnumerable().GetEnumerator(); } public IDisposable Subscribe(IObserver<T> observer) { return ((IObservable<T>)this.provider.Execute(this.expression)).Subscribe(observer); } }

"Hold on a second.  Why bother to change the query provider to return an IObservable when we can just do the reverse?  We could just convert the IEnumerable to an IObservable if our Query object was traversed asynchronously?"

We don't want to do this because converting an asynchronous operation to an IEnumerable uses a precious resource: a thread.  Let's say our query provider makes several calls to our web service, one for each stock in the query.  We would hope that it would make these calls concurrently.  If the query provider had to return an IEnumerable it would need to block and wait for the results from the worker threads.  If we turn around and immediately convert that IEnumerable into an IObservable a thread is completely wasted. Therefore to conserve threads we should avoid converting an IObservable to an IEnumerable until we absolutely have to.

Now we're ready to write the Yahoo Finance class that we will query against.

public class YahooFinance { public YahooFinance() { this.Stocks = new ObservableQuery<StockInfo>(new YahooFinanceStocksQueryProvider()); } public ObservableQuery<StockInfo> Stocks { get; internal set; } }

Now if we wrote a query against our Yahoo Finance class and compiled the code we would get the compiler error "Multiple Implementations of the Query Pattern were found for the source type."  The C# compiler would be confused because ObservableQuery implements both IEnumerable and IObservable and it wouldn't know which set of extension methods to invoke.  The answer is to implement the query pattern for our concrete ObservableQuery class because the compiler will always choose the methods from the most derived class.

public static class ObservableQuery { public static ObservableQuery<T> Where<T>(this ObservableQuery<T> q, Expression<Func<T, bool>> predicateExpression) { MethodInfo method = typeof(ObservableQuery).GetMethods().Where(m => m.Name == "Where" && m.GetParameters().Length == 2).First(); method = method.MakeGenericMethod(new Type[] { typeof(T) }); var expression = Ex.Call(method, q.expression, predicateExpression); return new ObservableQuery<T>(q.provider, expression); } public static ObservableQuery<R> Select<T,R>(this ObservableQuery<T> q, Expression<Func<T, R>> predicateExpression) { MethodInfo method = typeof(ObservableQuery).GetMethods().Where(m => m.Name == "Select" && m.GetParameters().Length == 2).First(); method = method.MakeGenericMethod(new Type[] { typeof(T), typeof(R) }); var expression = Ex.Call(method, q.expression, predicateExpression); return new ObservableQuery<R>(q.provider, expression); } // etc, etc, etc... }

Every time an ObservableQuery extension method is called we append the method call to the expression created so far!  In cases where a method accepts a function as a parameter (ex. Select, Where, OrderBy) we accept an expression of that function type instead.  This ensures that we don't lose any of the information about the query expression as we're building it.  The end result of our query is an ObservableQuery that knows exactly what expression it must execute.

Asynchronously Iterating our Observable Query

Query expressions against an ObservableQuery return a new ObservableQuery - which is an IObservable.  Iterating its results is just a matter of subscribing to it.

YahooFinance service = new YahooFinance(); IQueryable stockInfos = from stock in service.Stocks where stock.Name == "MSFT" where stock.Date > new DateTime(1999, 1, 1) select stock; stockInfos.Subscribe(stock => Debug.WriteLine(stock));

On the other hand if we want to traverse the results synchronously we can use foreach...

foreach(StockInfo stock in stockInfos) { Debug.WriteLine(stock); }

Asynchronous Queryable Services

That's it.  Hopefully existing query providers will be updated to use this approach so that they can support asynchronous iteration.

About Me

My photo
I'm a software developer who started programming at age 16 and never saw any reason to stop. I'm working on the Presentation Platform Controls team at Microsoft. My primary interests are functional programming, and Rich Internet Applications.