RX Framework – Building a message bus

I’ve been toying around with the Reactive Extensions (RX) Framework for .NET 4 the last few days and I think I’ve found a quite nice usecase for it;
Since RX is all about sequences of events/messages, it does fit very well together with any sort of message bus or event broker.

Just check this out:

Our in proc message bus:

public class MiniVan
{
    private Subject<object> messageSubject = new Subject<object>();

    public void Send<T>(T message)
    {
        messageSubject.OnNext(message);
    }

    public IObservable<T> AsObservable<T>()
    {
        return this
            .messageSubject
            .Where(m => m is T)
            .Select(m => (T)m);
    }
}

Subscribing to messages:

bus.AsObservable<MyMessage>()
    .Do(m => Console.WriteLine(m))
    .Subscribe();

The nice thing about this is that you get automatic Linq support since it is built into RX.
So you can add message handlers that filters or transform messages.
Pretty slick isnt it?

I’m currenty writing an example IRC chat client based on this idea which I will publish in a week or two.

//Roger

7 Comments

  1. ram's avatar ram says:

    Hi, there is an extension method call “OfType” to filter a enumerable by its type.

  2. Roger Johansson's avatar Roger Alsing says:

    Yes, but this is not an enumerable, it is an “observable” from the RX Framework.. no such extension on those yet.

  3. John Rayner's avatar John Rayner says:

    OfType is also available as an extension method to IObservable. It would combine the Where and the Select from your message bus.

  4. Chuka Jon-Ubabuco's avatar Chuka Jon-Ubabuco says:

    I refactored the method using using LINQ.
    public IObservable AsObservable() { return this .messageSubject.OfType();}

  5. Chuka Jon-Ubabuco's avatar Chuka Jon-Ubabuco says:

    public IObservable<T> AsObservable<T>() { return this .messageSubject.OfType<T>(); }

Leave a reply to ram Cancel reply