There are many different flavors of queuing system out there: MSMQ, RabbitMQ, Amazon Simple Queue Service, IBM WebSphere and more. Windows Azure Service Bus Brokered Messaging is a queuing system that is a scalable, multi-featured messaging service hosted in Windows Azure, or available as part of the Windows Azure Pack in your own data center.
Microsoft offers two different queuing technologies in Windows Azure, and they can be easily confused. This article will focus on the Windows Azure Service Bus Brokered Messaging service, but there is also Windows Azure Storage Queues. Unless I indicate otherwise, I’ll be describing the Service Bus, or will directly refer to the other queue service as Storage Queues.
The examples in this article are written in C# using the .NET Client Library for Service Bus; however, the majority of the features of the Service Bus Brokered Messaging are exposed through a REST based API. The documentation for the REST API can be found online at WindowsAzure.com and there are also examples for how to use the Service Bus Brokered messaging with Node.js, Python, Java, PHP and Ruby.
What’s in a Queue?
A message queue provides an asynchronous, decoupled message communication between two, or more, bits of code. A producer of a message can submit that message to a queue and know that it is guaranteed to be delivered to one or more consumers on the other side. This decoupling is a great way to introduce buffering requests for systems that need to scale and introduce some resilience into your solution.
The advantage of using a message queue in an application is that the sender and receiver of the message do not need to interact with the message queue at the same time. They can work asynchronously. Messages placed onto the queue are stored until the recipient is able to retrieve them and act upon them. Messages can originate from one or more sources, often referred to as producers. The messages stay in the queue until they are processed, generally in order, by one or more consumers. If you want to speed up the processing you can usually add more consumers doing the processing.
Take for example an order system which sells widgets. The front end web site provides a catalog of the widgets for sale where a lot of visitors come to look at the widgets, comparing them and reading reviews. The back end code for the system knows how to process orders for widgets. Between the front end and the processing back end sits a queue. If a visitor purchases a widget, a message is sent to a queue and is processed by the back end.
We get several advantages from splitting apart the work of the producer of the messages from the consumer of those messages. Firstly, while it would be great if every visitor that came to the site bought a widget, it’s likely that isn’t the case. We can get better resource management if we can scale the two processes independently. Second, it also means that if, for some reason, our processing system goes down we can rely on the fact that the order messages coming in from the web site will be in the queue when the system comes back online. Finally, our system can also handle sudden spikes in the number of visitors who decide to purchase widgets. The time it takes to actually process the order is decoupled from the user, so if the site experiences a rush of purchases the orders can be captured on the queue and processed as the back end has time. The number of back end processors can even be increased to help speed things up if necessary.
Getting Started
At its core, the Windows Azure Service Bus Brokered Messaging feature is a queue, albeit a queue with quite a few very useful features. To get started, you will need to have a Windows Azure account. You can get a free trial account or, if you have a MSDN Subscription, you can sign up for your Windows Azure benefits in order to experiment with the features detailed in this article. Once you have an Azure account you can then create a queue.
Creating a Queue
To create a queue, log in to the Windows Azure management portal. After you are logged in you can quickly create a Service Bus Queue by clicking on the large New button at the bottom left of the portal.
From the expanding menu, select the ‘App Services’ option, then ‘Service Bus’, then ‘Queue’ and finally, ‘Quick Create’.
When using ‘quick create’, we need to provide a queue name, a location for the data and a namespace. You can look at the advanced options for creating a queue using ‘custom create’ later, but for now the ‘quick create’ will be fine.
Just as in .NET or XML, a namespace in Service Bus is a way of scoping your service bus entities so that similar or related services can be grouped together. Within a Service Bus Namespace, you can create several different types of entities: queues, topics, relays and notification hubs. In this article we will cover Service Bus queues in depth and mention Service Bus topics. The namespace will also be hosted in a given data center, such as East US, West US, etc. The data for that namespace will only be in that location.
If you didn’t have a namespace created for the selected location when you’re using ‘quick create’, it will use a default name of the namespace of the queue name plus “-ns”. You can modify that if you wish. If you already had one or more namespaces in the selected location, you can select from an existing namespace as well. The namespace name must be globally unique since it is used as part of the URI to the service, the queue name only has to be unique within the namespace.
Once you’ve filled in the three values, click ‘Create a New Queue’ at the bottom of the screen. The portal will then create the new queue and within a few seconds you should have a queue that is ready to use.
Retrieving the Connection String
Before we interact with the queue in code, we need a little more information from the portal, namely the connection string. The connection string will contain the URI and the credentials that we need to access the queue. In this way it is very similar to a connection string used by SQL Server.
Each Service Bus Namespace that is created is provisioned with a Windows Azure Active Directory Access Control Service (ACS) namespace (yes, that’s an impressively long name). Within this ACS namespace, a default credential called owner is created, but that credential will have full permissions on anything within the namespace. You can see this credential when looking at the namespace (not the queue) in the portal if you click on the‘Connection Information’ icon in the command bar at the bottom of the screen. This credential is something you don’t want to use unless you are managing the namespace, and is certainly not something you would hand out to your partner who needs to send you messages on a queue. You can use ACS to create other credentials to secure your service bus entities, or you can also use Shared Access Policies which allows you to define the permissions and get a connection string. A discussion of ACS is beyond the scope of this article, so instead we will create a quick Shared Access Policy to use.
To create a Shared Access Policy for a queue you can select the queue from the management portal by clicking on the queue name, then click on the ‘Configure’ tab. On this tab you’ll see the ‘shared access policies’ section. Give the policy a name. You can name this whatever you wish, but it should be something that has some meaning. You could name it after the application that will use the policy, the type of permissions it comprises, etc. At the time of writing, you can create up to only twelve policies per entity (queue, topic, etc.), so naming the policy for a partner or client might not scale. If you need many credentials you should research using ACS credentials for your purposes.
In the screenshot below you can see that a Signed Access Policy is created that is named submitandprocess and was given the rights to Listen and Send. This means that any client using this policy will be able to both send messages to this queue, as well as listen to the queue, meaning that they can process the messages off the queue. Create a policy like the one you see below and click the ‘Save’ icon at the bottom of the screen.
After the Shared Access Policy is created the screen is updated to show you a primary and secondary key for that policy. These keys are both valid for the policy name and can be regenerated by clicking on the regenerate button next to each key. When you click regenerate you are effectively disabling any client that might be using that policy with the key provided.
These keys should be kept secret, otherwise anyone with this information has access to whatever permissions was assigned to the policy. Don’t worry, I’ve already regenerated the ones you see above.
To get a copy of the connection string, you’ll need to switch over to the Dashboard tab for the queue and view the connection string information. Click on the ‘Connection Information’ icon at the bottom of the screen: Then when you hover over the end of the connection string you’ll get a copy icon. You can use this to get a copy of the full connection string.
It seems like there have been a lot of steps just to get a queue set up, but in reality this doesn’t take long at all. There are actually many different ways you can create a queue. For example, you can create queues on the fly in code as long as you have the credentials with the correct permissions to do so.
Let’s get to some Code!
Now that a queue is created out there for us to send to, and receive messages on, we can start in with some code to do just that. For the first example we will use a C# console application that will send a message.
Using Visual Studio, create a C# Console application from the standard template. By default the created project will not have a reference to the Service Bus library, but through the wonders of NuGet we can fix that quickly. Right-click on the project and select ‘Manage NuGet Packages…’ from the context menu.
Once the Package Manager dialog is loaded, select the ‘Online’ tab and search for ‘Service Bus’. Select the Windows Azure Service Bus package from Microsoft and click ‘Install’. As of the time of writing version 2.2.1.1 was the newest stable version.
If you prefer to use the PowerShell Package Manager Console, you can also use the command ‘Get-Package WindowsAzure.ServiceBus’ to install the package. The package will install two assemblies: Microsoft.ServiceBus and Microsoft.WindowsAzure.Configuration.
Sending a Message
Open the program.cs file in your project and add the following statements to the top of the file
:
:
using Microsoft.ServiceBus.Messaging;
Add the following to the Main method, making sure to include the connection string we copied out of the portal (remove the < and > as well.):
string connectionString = "<YourConnectionStringHere>";
MessagingFactory factory = MessagingFactory.CreateFromConnectionString(connectionString);
//Sending a message
MessageSender testQueueSender = factory.CreateMessageSender("samplequeue");
BrokeredMessage message = new BrokeredMessage("Test message");
testQueueSender.Send(message);
Console.WriteLine("Message(s) sent.");
Console.WriteLine("Done, press a key to continue...");
Console.ReadKey();
You should be able to execute this and see that a message is sent. We’ll verify it actually went somewhere soon, but for now let’s look at the code. We create a MessageFactory object using the connection string from the portal. The connection string contains the base URI to the namespace and the credentials in the form of the Shared Access Policy name and key. These credentials are used when a communication occurs with the Service Bus to verify that the caller actually does have rights to interact with the service. All the communication is also secured at the transport layer so it is encrypted going over the wire.
We use CreateMessageSender method from this factory object to create a MessageSender instance, which is what is used to actually send a message to the queue. The name of the queue is passed in as a parameter to theCreateMessageSender method.
If you look over the methods that are available to you on the MessageFactory, you’ll also see aCreateQueueClient method. The MessageSender is an abstraction and we are using it in place of theQueueClient. Unless there is some functionality you absolutely must have from QueueClient, I highly recommend that you use the MessageSender abstraction. We will touch on why this abstraction exists later in the article. Note that creating a MessageSender can be an expensive operation so it is best to create one and reuse it when possible.
The type we actually send is a BrokeredMessage, so the code next creates one of these. The constructor used here appears to be taking in a string of “Test Message”, but it’s actually taking in the string as an object and will serialize the object using the DataContractSerializer by default. There are also overloads where you can pass along a different XMLObjectSerializer to use.
The maximum size of message you can send is 256KB, with a maximum of 64KB for the header, including any metadata. The client library will break up the message into 64KB chunks to actually send it over the wire, so be aware that the larger messages will incur more transactions. Be careful when serializing objects into messages in that their serialized size will be larger than the object size in memory. If you need to send larger messages you may want to look at storing the message somewhere and sending a pointer to the data over the service bus.
Finally, we call Send on the MessageSender, passing the instance of the message. This method will deliver the message to the queue. If no exception occurs the message is successfully delivered.
Retrieving a Message
The next step is to retrieve the message. We’ll create a new C# Console application to act as the consumer of the messages. In your Visual Studio solution add a new Project for the consumer of the messages. Once the new project is created, then use the NuGet package manager to add the same Service Bus package that you did to the first project.
Just as before, add the following using statement to the top of your program.cs file for the consumer:
using Microsoft.ServiceBus.Messaging;
Add the following code to the main method of the new project, again ensuring to add your connection string:
string connectionString = "<YourConnectionStringHere>"
MessagingFactory factory = MessagingFactory.CreateFromConnectionString(connectionString);
//Receiving a message
MessageReceiver testQueueReceiver = factory.CreateMessageReceiver("samplequeue");
while (true)
{
using (BrokeredMessage retrievedMessage = testQueueReceiver.Receive())
{
try
{
Console.WriteLine("Message(s) Retrieved: " + retrievedMessage.GetBody<string>());
retrievedMessage.Complete();
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
retrievedMessage.Abandon();
}
}
}
This code is not that different from the previous code when it comes to the interactions of setting up connections to the Service Bus. The only real difference is that we are creating a MessageReceiver and performing a Receive to retrieve a message from the queue within a loop. The code is performing a Receive within the loop so if you run just this console application you will see it write out the message (or messages) that are in the queue. It is very common to see message processing code in an infinite loop like you see above, or only breaking out of the loop if it has been instructed to stop processing by an outside source.
In the example above we are using the ‘parameterless’ overload of Receive, which is a synchronous and blocking call. Execution of our code will not continue either until a message is actually received from the service bus or an exception occurs. There is another Receive overload which takes a TimeSpan as a parameter which allows you to add a client side timeout as well. This is useful if you want to break out of the receive loop from time to time to verify that you should still be processing messages. Formore ways to handle retrieving a message you can research the async versions of the Receive method and even the more event-based approach using the OnMessage event from the MessageReceiever.
Once we have the BrokeredMessage instance, we can retrieve the content of the message by using theGetBody<T> method. If we used an XMLObjectSerializer different than the default we would need to pass an instance of the correct serializer into the GetBody call so that the object can be correctly retrieved. In this simple example, the code retrieves the string from the message and writes it to the console; however, this message could represent order details or just about anything.
As a message is retrieved from the queue it is pulled from the queue in one of two modes: either withRecieveAndDelete or PeekLock. You can control which method you use when you generate theMessageSender. For Example:
MessageReceiver testQueueReceiver = factory.CreateMessageReceiver("samplequeue",ReceiveMode.ReceiveAndDelete);
By default, a message is retrieved using the PeekLock mode of retrieval, which is what the original code example is doing. This means that the message is locked on the queue so that it cannot be retrieved by another consumer and a peek is performed so that the consumer is handed a copy to process. Once the code has finished processing, it calls Complete on the message. This notifies the service bus that this message is completed and can be permanently removed from the queue. Likewise, if something goes wrong during the processing and an exception is thrown, then the code calls Abandon on the message, notifying the service bus that the message could not be processed and it can be retrieved by another consumer (or possibly even the same consumer).
By using PeekLock, we ensure that our message will be on the queue until it is processed at least once. Each message that is removed from the queue using PeekLock is locked for a period of time which is specified by theLockDuration of the queue itself. This value is set at the time that the queue is created. The maximum value is five minutes, but the default value is 30 seconds. Since we quickly created our queue for this sample, it has the default value of a 30 second lock. If you need to, you can change the lock duration for a queue after it is created from the Management portal or from code.
When a BrokeredMessage is pulled from the queue using the PeekLock, it is assigned a GUID as a LockToken. This token is only valid during the Lock Duration. If for some reason the code takes too long to process the message and the lock expires, when the code calls Complete it would receive a MessageLostLock exception since the token is no longer valid. The message might have even already been handed off to another consumer to process by that time. If necessary you can also call RenewLock on the message instance while processing, which will extend a still valid lock by the time set for the LockDuration on the queue. In this way you can keep extending the time until you are finished processing if necessary.
The second receive mode is ReceiveAndDelete, which does exactly what it sounds like it does: once the message is received by a consumer, it is immediately removed from the queue. This is referred to as ‘At Most Once’ delivery. This saves the overhead of the two-phase update to process a message; however, if something were to happen during processing and the consumer failed to process the message, then that message would be lost. You might use this approach if you decide that you don’t need to process every message that comes through the system and it is acceptable if some messages are lost. If you take this approach, it would be a good idea to have detailed logging to determine just how many messages you might be losing.
A Word about Idempotency
When you were reading the information above you might have questioned the phrase “processed at least once.” That might sound very risky to some people, and if you aren’t careful it is possible that it is very dangerous. In a distributed system many things can fail or run into issues, so it is possible that a message is picked off the queue to be processed multiple times before it is fully completed. If this is the case you have to understand what that means to your system.
The word ‘idempotent’ means that the operation can be performed multiple times, and beyond the first time it is performed the result is not changed. An example of an idempotent operation is a database script that inserts data into a table only if the data isn’t already present. No matter how many times the script is executed beyond the first time the result is that the table contains the data. Idempotency such as this can also be needed when working with message processing if the messages could potentially be processed more than once.
Some message-processing will be inherently idempotent. For example, if a system generates image thumbnails of a larger file stored in BLOB storage it could be that it doesn’t matter how many times the message is processed; the outcome is that the thumbnails are generated and they are the same every time. On the other hand, there are operations such as calling a payment gateway to charge a credit card that may not be idempotent at all. In these cases you will need to look at your system and ensure that processing a message multiple times has the effect that you want and expect.
But Wait, There’s More!
The Service Bus Brokered Messaging is so much more than a simple queuing technology and it goes well beyond the very basic example above. There many additional features, each of which could easily have an article dedicated to it. Below is a very brief description of some of these features:
Deadlettering – From time to time a message may arrive in your queue that just can’t be processed. Each time the message is retrieved for processing the consumer throws an exception and cannot process the message. These are often referred to as poisonous messages and can happen for a variety of reasons, such as a corrupted payload, a message containing an unknown payload inadvertently delivered to a wrong queue, etc. When this happens, you do not want your system to come to grinding to a halt simply because one of the messages can’t be processed.
Ideally the message will be set aside to be reviewed later and processing can continue on to other messages in the queue. This process is called ‘Deadlettering’ a message and the Service Bus Brokered Messaging supports dead lettering by default. If a message fails to be processed and appears back on the queue ten times it will be placed into a dead letter queue. You can control the number of failures it takes for a message to be dead lettered by setting the MaxDeliveryCount property on the queue. When a message is deadlettered it is actually placed on a sub queue which can be accessed just like any other Service Bus queue. In the example used above the dead letter queue path would be samplequeue/$DeadLetterQueue. By default a message will be moved to the dead letter queue if it fails delivery more than 10 times.
Automatic dead lettering does not occur in the ReceiveAndDelete mode as the message has already been removed from the queue.
Deferring—You may run into a situation where you want to defer the processing of a given message. You can do this using the Defer method on the BrokeredMessage instance. When you call Defer the service bus will leave the message on the queue, but it will effectively be invisible until it is explicitly retrieved using the Receive overload that accepts a sequence number. You will need to read the SequenceNumber property from message instance and keep track of it in your own code in order to retrieve the message later.
Deferring does not work in the ReceiveAndDelete mode as the message has already been removed from the queue.
Retry Policies— As with any system, there may be errors when you perform an operation against the service bus. Some of these errors will be transient in nature, like a networking hiccup or timeout. When this happens you don’t want to lose the message, or stop your processing if the error is actually recoverable. In these cases you can configure a Retry Policy on the MessagingFactory, MessageReceiver or MessageSender instances. By default a retry policy is defined for you, but you can substitute your own as well.
Sessions— In those cases that you wish to send messages greater than 256KB, or if you simply want to send messages that need to be processed together by the same consumer, you can use the ‘sessions’ feature. This is advanced feature of Service Bus Brokered Messaging and even includes the ability to store the state of sessions as you wait for more messages from a session to arrive.
Topics & Subscriptions— This is an extremely powerful feature of Service Bus Brokered Messaging in which you can create a publish and subscribe distribution. A topic is a special type of queue. Messages delivered to a topic are then delivered to any subscription that is signed up to receive them. You can even apply filters based on metadata in the message headers to provide routing of messages. The message producers deliver to a topic and message consumers receive messages from a subscription. At the code level these are different entities than theQueueClient, which is why the abstraction of the MessageSender and MessageReceiver is useful. When using the abstractions your code doesn’t have to care if it is dealing with a regular queue or a topic/subscription.
Comments
Post a Comment