Akka.NET + Azure: Azure ServiceBus integration

I know that there is some confusion out there on how Akka.NET relates to products like NServiceBus and Azure ServiceBus, I think that Akka.NET Co-founder Aaron Stannard said it the best;

they’re very complimentary Akka.NET makes a great consumer or producer for NServiceBus

Another closely related question that comes up from time to time is how to integrate Akka.NET actors with service buses.

How can we pull messages from a service bus and pass those to a number of worker actors w/o message loss?

One approach we can use to solve this is since actors in Akka.NET support the Ask operator.
We can pass a message to an actor and expect a response, this response will be delivered in form of a Task.

As the response is a task, we can pipe this task into a continuation and depending on if the response represents a processing success or failure from the worker actor, we can then decide what we want to do with the service bus message.

In this case, we might want to Ack the service bus message, telling the service bus that we are done with this message and it can be removed from the queue.

If the response was a failure, just ignore the failure and continue processing other messages.
As we haven’t acked the message to the service bus, the service bus will try to re-deliver the message to our client and we get the chance to try again some time later.

A simple implementation of this approach using Azure Service bus could look something like this:

namespace ConsoleApplication13
{
    //define your worker actor
    public class MyBusinessActor : ReceiveActor
    {
        public MyBusinessActor()
        {
            //here is where you should receive your business messages
            //apply domain logic, store to DB etc.
            Receive<string>(str =>
            {
                Console.WriteLine("{0} Processed {1}", Self.Path, s);

                //reply to the sender that everything went well
                //in this example, we pass back the message we received in a built in `Success` message
                //you can send back a Status.Failure incase of exceptions if you desire too
                //or just let it fail by timeout as we do in this example
                Sender.Tell(new Status.Success(s));
            });
        }
    }

    internal class Program
    {
        private static void Main(string[] args)
        {
            CreateMessages();

            using (var system = ActorSystem.CreateSystem("mysys"))
            {
                //spin up our workers
                //this should be done via config, but here we use a
                //hardcoded setup for simplicity

                //Do note that the workers can be spread across multiple
                //servers using Akka.Remote or Akka.Cluster
                var businessActor =
                    system.ActorOf(Props
                       .Create<MyBusinessActo>()
                       .WithRouter(new ConsistentHashingPool(10)));

                //start the message processor
                ProcessMessages(businessActor);

                //wait for user to end the application
                Console.ReadLine();
            }
        }

        private static async void ProcessMessages(IActorRef myBusinessActor)
        {
            //set up a azure SB subscription client
            //(or use a Queue client, or whatever client your specific MQ supports)
            var subscriptionClient = SubscriptionClient.Create("service1","service1");

            while (true)
            {
                //fetch a batch of messages
                var batch = await subscriptionClient.ReceiveBatchAsync(100, TimeSpan.FromSeconds(1));

                //transform the messages into a list of tasks
                //the tasks will either be successful and ack the MQ message
                //or they will timeout and do nothing
                var tasks = (
                    from res in batch
                    let importantMessage = res.GetBody<string>()
                    let ask = myBusinessActor
                        .Ask<Status.Success>(new ConsistentHashableEnvelope(importantMessage,
                            importantMessage.GetHashCode()),TimeSpan.FromSeconds(1))
                    let done = ask.ContinueWith(t =>;
                    {
                        if (t.IsCanceled)
                        {
                            Console.WriteLine("Failed to ack {0}", importantMessage);
                        }
                        else
                        {
                            res.Complete();
                            Console.WriteLine("Completed {0}", importantMessage);
                        }
                    },TaskContinuationOptions.None)
                    select done).ToList();

                //wait for all messages to either succeed or timeout
                await Task.WhenAll(tasks);
                Console.WriteLine("All messages handled (acked or failed)");
                //continue with the next batch
            }
        }

        //dummy method only used to prefill the msgqueue with data for this example
        private static void CreateMessages()
        {
            var client = TopicClient.Create("service1");

            for (var i = 0; i < 100; i++)
            {
                client.SendAsync(new BrokeredMessage("hello" + i)
                {
                    MessageId = Guid.NewGuid().ToString()
                });
            }
        }
    }
}

But do note that when applying this pattern, we now go from the default Akka.NET “At most once” deliver to “At least once”.

Why?

Because if we fail to ack the message back to the service bus, we will eventually receive the same message again at a later time.

It could be that our worker actor have processed the message correctly, stored it in some persistent store, but the ack back to the client might have failed, network problems, timeout or something similar.

Thus, the wervice bus meesage will not be removed from the queue and the client will receive it as soon as whatever locking mechanism is in place frees the message again.

One extremely nice feature in Akka.NET is the cluster support. cluster nodes can be added or removed to a live application, so we can easily spread our load over multiple worker actors on remove nodes.

Completely w/o writing any special code for this, we just need to configure our actor system to be part of a Akka.NET cluster.

HTH

Learning Azure, Day 2 | Servicebus

This is a continuation of my completely random learning experiences while trying to learn the Azure plattform.

Future messages

When you send a message on the Azure Servicebus, you have the option to set a ScheduledEnqueueTimeUtc property.
This value decides when the message will be visible to the receivers, it is kind of like sending a message in the future.

So when is this useful?

Let’s say we want to send a payment reminder 20 days after an order was placed, but only if no payments have arrived.
We could solve this using future messages.

Consider the following flow of messages

Receive PlaceOrder message -> Send Reminder (set ScheduledEnqueueTimeUtc to 20 days in the future)

..20 days pass

Receive Reminder message -> Check if the order have been paied, if not, send the reminder to the buyer using snailmail or email

This way, you don’t have to build additional scheduling logic to your system or add polling tables to your database, it is handled by the Servicebus itself.

Looks super useful, I have to look into how this scales, if messages can be kept for years without side effects.

That’s all for now

Learning Azure, Day 1 | Servicebus

I’ve decided to bite the bullet and finally dig into Azure.

My first learning experience was to toy around with the Servicebus.
Azure Servicebus is a message queue pretty much like the old MSMQ, but with a few more nifty features.
e.g. messages can have a time to live duration, you can chose between At most once or At least once delivery (ReceiveMode.ReceiveAndDelete vs ReceiveMode.PeekLock).

The first gotcha was that the sample I downloaded from MS, recreated the queue I intended to communicate with.
Which in turn causes your credentials for the queue to disappear, don´t fall for that :-)

It also turns out that the built in batch operations are way more performant than single operations.
So if possible, go for batching if you need high throughput.

Another thing that was expected but maybe not to that extent, was that using ReceiveAndDelete gives a lot better throughput than using PeekLock.
Using PeekLock, you have to Ack back to the queue yourself to notify the queue that you have dealt with the message.

This was using a single threaded client, so maybe the difference is not that big if processing async, I don’t know yet, but in the very naive example I’m using, there was a huge difference.

And for those who are just looking for some example code, here it is:

using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using Microsoft.ServiceBus.Messaging;

namespace ConsoleApplication5
{
    class Program
    {
        private static QueueClient _queueClient;
        static void Main(string[] args)
        {          
            Console.WriteLine("Press anykey to start sending messages ...");
            Console.ReadKey();
            SendMessages();
            Console.WriteLine("Press anykey to start receiving messages that you just sent ...");
            Console.ReadKey();
            ReceiveMessages();
            Console.WriteLine("nEnd of scenario, press anykey to exit.");
            Console.ReadKey();
        }

        private static void SendMessages()
        {
            // connect to queue named "queue1"
            // use At most once delivery
            _queueClient = QueueClient.Create("queue1", ReceiveMode.ReceiveAndDelete);

            var messageList = new List<BrokeredMessage>();

            for (int i = 0; i < 1000; i++)
            {
                messageList.Add(new BrokeredMessage("Some message")
                {
                    MessageId = i.ToString(CultureInfo.InvariantCulture)
                });
            }

            Console.WriteLine("nSending messages to Queue...");

            // send 1000 messages to the queue
            _queueClient.SendBatch(messageList);
        }

        private static void ReceiveMessages()
        {
            Console.WriteLine("nReceiving message from Queue...");

            while (true)
            {
                //receive a batch of messages from Queue
                var messages = _queueClient.ReceiveBatch(100,
                    TimeSpan.FromMilliseconds(500)).ToList();

                Console.WriteLine("nDone...");

                foreach (var message in messages)
                {
                    if (message != null)
                    {
                        Console.WriteLine("Message received: Id = {0}, Body = {1}", 
                            message.MessageId, message.GetBody<string>());
                    }
                }
            }
        }
    }
}