Today at the Build conference in Anaheim California, Satya Nadella, President Server and Tools business announced general availability of the production release of AppFabric Queues and Topics, otherwise known as Brokered Messaging.
Brokered Messaging introduces durable queue capabilities and rich, durable pub-sub with topics and subscriptions that compliment the existing Relayed Messaging capabilities.
I covered Brokered Messaging following the May CTP release of Queues and followed up shortly with an overview and exploration of Topics (please see some other great resources at the end of this post).
Since then, there was a June CTP release which included the new AppFabric Application and no visible changes to Brokered Messaging, however since its release, the AppFabric Messaging team has been hard at work refining the API and behaviors based on feedback from Advisors, MVPs and the community at large.
Since I’ve already covered Queues and Topics in the aforementioned posts, I’ll dive right in to some terse examples which demonstrate the API changes. Though not an exhaustive review of all of the changes, I’ve covered the types that your most likely to come across and will cover Queues, Topics and Subscriptions extensively in my upcoming article in CODE Magazine which will also include more in-depth walk-throughs of the .NET Client API, REST API and WCF scenarios.
Those of you who have worked with the CTPs will find some subtle and not so subtle changes, but all in all I think all of the refinements are for the best and I think you’ll appreciate them as I have. For those new to Azure AppFabric Service Bus Brokered Messaging, you’ll benefit most from reading my first two posts based on the May CTP (or any of the resources at the end of this post) to get an idea of the why behind queues and topics and then come back here to explore the what and how.
A Quick Note on Versioning
In the CTPs that preceded the release of the new Azure AppFabric Service Bus features, a temporary assembly called “Microsoft.ServiceBus.Messaging.dll” was added to serve a container for new features and deltas that were introduced during the development cycle. The final release includes a single assembly called “Microsoft.ServiceBus.dll” which contains all of the existing relay capabilities that you’re already familiar with as well as the addition of support for queues and topics. If you are upgrading from the CTPs, you’ll want to get ahold of the new Microsoft.ServiceBus.dll version 1.5 which includes everything plus the new queue and topic features.
The new 1.5 version of the Microsoft.ServiceBus.dll assembly targets the .NET 4.0 framework. Customers using .NET 3.5 can continue using the existing Microsoft.ServiceBus.dll assembly (version 1.0.1123.2) for leveraging the relay capabilities, but must upgrade to .NET 4.0 to take advantage of the latest features presented here.
.NET Client API
Queues
May/June CTP | General Availability | Comments |
ServiceBusNamespaceClientSettings | NamespaceManagerSettings | New class for encapsulating endpoint and security settings. |
N/A | TokenProvider | New class for acquiring a WRAP token from ACS. |
ServiceBusNamespaceClient | NamespaceManager | Root management object for creating Queues, Topics, Subscriptions. |
Queue | QueueDescription | In May/June CTP, Topic / Queue / Subscription were management objects. All create/delete operations were moved to NamespaceManager and the state operations are now on TopicDescription/QueueDescription etc. |
MessagingFactorySettings | MessagingFactorySettings | New class for encapsulating security settings. |
MessagingFactory | MessagingFactory | |
BrokeredMessage | BrokeredMessage | No longer a factory. Simply instantiate a BrokeredMessage. |
MessageSender | MessageSender | Optional, for use when you want to abstract away queue or topic. |
MessageReceiver | MessageReceiver | Optional, for use when you want to abstract away queue or topic. |
Below is a representative sample for creating, configuring, sending and receiving a message on a queue:
Administrative Operations
1:
2: // Configure and create NamespaceManager for performing administrative operations
3: NamespaceManagerSettings settings = new NamespaceManagerSettings();
4: TokenProvider tokenProvider = settings.TokenProvider = TokenProvider.CreateSharedSecretTokenProvider(issuer,key);
5:
6: NamespaceManager manager = new NamespaceManager(ServiceBusEnvironment.CreateServiceUri("sb", serviceNamespace, string.Empty), settings);
7:
8: // Check for existence of queues on the fabric
9: var qs = manager.GetQueues();
10:
11: var result = from q in qs
12: where q.Path.Equals(queueName, StringComparison.OrdinalIgnoreCase)
13: select q;
14:
15: if (result.Count() == 0)
16: {
17: Console.WriteLine("Queue does not exist");
18:
19: // Create Queue
20: Console.WriteLine("Creating Queue...");
21:
22: manager.CreateQueue(new QueueDescription(queueName) { LockDuration = TimeSpan.FromSeconds(5.0d) });
23:
24: }
Runtime Operations
1: // Create and Configure Messaging Factory to provision QueueClient
2: MessagingFactorySettings messagingFactorySettings = new MessagingFactorySettings();
3: messagingFactorySettings.TokenProvider = settings.TokenProvider = TokenProvider.CreateSharedSecretTokenProvider(issuer, key);
4: MessagingFactory messagingFactory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri("sb", serviceNamespace, string.Empty), messagingFactorySettings);
5:
6: QueueClient queueClient = messagingFactory.CreateQueueClient(queueName, ReceiveMode.PeekLock);
7:
8: Order order = new Order();
9: order.OrderId = 42;
10: order.Products.Add("Kinect", 70.50M);
11: order.Products.Add("XBOX 360", 199.99M);
12: order.Total = order.Products["Kinect"] + order.Products["XBOX 360"];
13:
14: // Create a Brokered Message from the Order object
15: BrokeredMessage msg = new BrokeredMessage(order);
16:
17: /***********************
18: *** Send Operations ***
19: ************************/
20:
21: queueClient.Send(msg);
22:
23: /**************************
24: *** Receive Operations ***
25: ***************************/
26:
27: BrokeredMessage recdMsg;
28: Order recdOrder;
29:
30: // Receive and lock message
31: recdMsg = queueClient.Receive();
32:
33: if(recdMsg != null)
34: {
35: // Convert from BrokeredMessage to native Order
36: recdOrder = recdMsg.GetBody<Order>();
37:
38: Console.ForegroundColor = ConsoleColor.Green;
39: Console.WriteLine("Received Order {0} \n\t with Message Id {1} \n\t and Lock Token:{2} \n\t from {3} \n\t with total of ${4}", recdOrder.OrderId, recdMsg.MessageId, recdMsg.LockToken, "Receiver 1", recdOrder.Total);
40: recdMsg.Complete();
41: }
42: queueClient.Close();
Note that MessageSender and MessageReceiver are now optional. Here’s an example that shows PeekLocking a message, simulating an exception and trying again:
1:
2: // Alternate receive approach using agnostic MessageReceiver
3: MessageReceiver receiver = messagingFactory.CreateMessageReceiver(queueName); // Recieve, complete, and delete message from the fabric
4:
5: try
6: {
7: // Receive and lock message
8: recdMsg = receiver.Receive();
9:
10: // Convert from BrokeredMessage to native Order
11: recdOrder = recdMsg.GetBody<Order>();
12:
13: // Complete read, release and delete message from the fabric
14: receiver.Complete(recdMsg.LockToken);
15:
16: Console.ForegroundColor = ConsoleColor.Green;
17: Console.WriteLine("Received Order {0} \n\t with Message Id {1} \n\t and Lock Token:{2} \n\t from {3} \n\t with total of ${4} \n\t at {5}", recdOrder.OrderId, recdMsg.MessageId, recdMsg.LockToken, "Receiver 2", recdOrder.Total, DateTime.Now.Hour + ":" + DateTime.Now.Minute + ":" + DateTime.Now.Second);
18: }
19: catch
20: {
21: // Should processing fail, release the lock from the fabric and make message available for later processing.
22: if (recdMsg != null)
23: {
24: receiver.Abandon(recdMsg.LockToken);
25:
26: Console.ForegroundColor = ConsoleColor.Red;
27: Console.WriteLine("Message could not be processed.");
28:
29: }
30: }
31: finally
32: {
33: receiver.Close();
34: }
As shown below, this sample results in order 42 being received by the QueueClient:
Topics
May/June CTP
|
General Availability
|
Comments
|
Topic
|
TopicDescription
|
In May/June CTP, Topic / Queue / Subscription were management objects. All create/delete operations were moved to NamespaceManager and the state operations are now on TopicDescription/QueueDescription etc.
|
TopicClient
|
TopicClient
|
As noted in the Queues section, you can use TopicClient or MessageSender in the event you want to abstract details of using Topics.
|
SubscriptionClient
|
SubscriptionClient
|
As noted in the Queues section, you can use SubscriptionClient or MessageReceiver in the event you want to abstract details of using a Topic/Subscription.
|
Subscription
|
SubscriptionDescription
|
Changes to constructors and use of properties (see code samples below), but intent is the same. |
RuleDescription
|
RuleDescription
|
Changes to constructors and use of properties (see code samples below), but intent is the same. |
FilterExpression
|
Filter
|
Base for Filter types such as SqlFilter
|
SqlFilterExpression
|
SqlFilter
|
|
FilterAction
|
RuleAction
|
|
Below is a representative sample for creating, configuring, sending and receiving a message on a topic:
Administrative Operations
1: // Configure and create NamespaceManager for performing administrative operations
2: NamespaceManagerSettings settings = new NamespaceManagerSettings();
3: settings.TokenProvider = TokenProvider.CreateSharedSecretTokenProvider(issuer, key);
4: NamespaceManager manager = new NamespaceManager(ServiceBusEnvironment.CreateServiceUri("sb", serviceNamespace, string.Empty), settings);
5:
6: // Check for existence of topics on the fabric
7: var topics = manager.GetTopics();
8:
9: var result = from t in topics
10: where t.Path.Equals(topicName, StringComparison.OrdinalIgnoreCase)
11: select t;
12:
13: if (result.Count() == 0)
14: {
15: Console.WriteLine("Topic does not exist");
16:
17: // Create Queue
18: Console.WriteLine("Creating Topic...");
19:
20: TopicDescription topic = manager.CreateTopic(topicName);
21: }
22:
23: // Create Subscriptions for InventoryServiceSubscription and CreditServiceSubscription and associate to OrdersTopic:
24: SubscriptionDescription inventoryServiceSubscription = new SubscriptionDescription(topicName, "InventoryServiceSubscription");
25: SubscriptionDescription creditServiceSubscription = new SubscriptionDescription(topicName, "CreditServiceSubscription");
26:
27:
28: // Set up Filters for NorthAmericaFulfillmentServiceSubscription
29: RuleDescription northAmericafulfillmentRuleDescription = new RuleDescription();
30: northAmericafulfillmentRuleDescription.Filter = new SqlFilter("CountryOfOrigin = 'USA' OR CountryOfOrigin ='Canada' OR CountryOfOrgin ='Mexico'");
31: northAmericafulfillmentRuleDescription.Action = new SqlRuleAction("set FulfillmentRegion='North America'");
32:
33:
34: // Create Subscriptions
35: SubscriptionDescription northAmericaFulfillmentServiceSubscription = new SubscriptionDescription(topicName, "NorthAmericaFulfillmentServiceSubscription");
36:
37: // Delete existing subscriptions
38: try { manager.DeleteSubscription(topicName, inventoryServiceSubscription.Name); } catch { };
39: try { manager.DeleteSubscription(topicName, creditServiceSubscription.Name); } catch { };
40: try { manager.DeleteSubscription(topicName, northAmericaFulfillmentServiceSubscription.Name); } catch { };
41:
42:
43: // Add Subscriptions and Rules to Topic
44: manager.CreateSubscription(inventoryServiceSubscription);
45: manager.CreateSubscription(creditServiceSubscription);
46: manager.CreateSubscription(northAmericaFulfillmentServiceSubscription, northAmericafulfillmentRuleDescription);
47:
Runtime Operations
1: // Create and Configure Messaging Factory to provision TopicClient
2: MessagingFactorySettings runtimeSettings = new MessagingFactorySettings();
3: runtimeSettings.TokenProvider = TokenProvider.CreateSharedSecretTokenProvider(issuer, key);
4: MessagingFactory messagingFactory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri("sb",serviceNamespace,String.Empty),runtimeSettings);
5:
6: // Create Topic Client for sending messages to the Topic:
7: TopicClient client = messagingFactory.CreateTopicClient(topicName);
8:
9: /***********************
10: *** Send Operations ***
11: ***********************/
12:
13: // Prepare BrokeredMessage and corresponding properties
14: Order order = new Order();
15: order.OrderId = 42;
16: order.Products.Add("Kinect", 70.50M);
17: order.Products.Add("XBOX 360", 199.99M);
18: order.Total = order.Products["Kinect"] + order.Products["XBOX 360"];
19:
20: // Set the body to the Order data contract
21: BrokeredMessage msg = new BrokeredMessage(order);
22:
23: // Set properties for use in RuleDescription
24: msg.Properties.Add("CountryOfOrigin", "USA");
25: msg.Properties.Add("FulfillmentRegion", "");
26:
27: // Send the message to the OrdersTopic
28: client.Send(msg);
29: client.Close();
30:
31: /**************************
32: *** Receive Operations ***
33: ****************************/
34:
35: BrokeredMessage recdMsg;
36: Order recdOrder;
37:
38: // Inventory Service Subscriber
39: SubscriptionClient inventoryServiceSubscriber = messagingFactory.CreateSubscriptionClient(topicName, "InventoryServiceSubscription",ReceiveMode.PeekLock);
40:
41: // Read the message from the OrdersTopic
42: while ((recdMsg = inventoryServiceSubscriber.Receive(TimeSpan.FromSeconds(5))) != null)
43: {
44: // Convert from BrokeredMessage to native Order
45: recdOrder = recdMsg.GetBody<Order>();
46:
47: // Complete read, release and delete message from the fabric
48: inventoryServiceSubscriber.Complete(recdMsg.LockToken);
49:
50: Console.ForegroundColor = ConsoleColor.Green;
51: Console.WriteLine("Received Order {0} \n\t on {1} \n\t with Message Id {2} \n\t and Lock Token {3}.", recdOrder.OrderId, "Inventory Service Subscriber", recdMsg.MessageId, recdMsg.LockToken);
52: }
53: inventoryServiceSubscriber.Close();
54:
55: // Credit Service Subscriber
56: SubscriptionClient creditServiceSubscriber = messagingFactory.CreateSubscriptionClient(topicName, "CreditServiceSubscription");
57:
58: // Read the message from the OrdersTopic
59: recdMsg = creditServiceSubscriber.Receive();
60:
61: // Convert from BrokeredMessage to native Order
62: recdOrder = recdMsg.GetBody<Order>();
63:
64: // Complete read, release and delete message from the fabric
65: creditServiceSubscriber.Complete(recdMsg.LockToken);
66:
67: Console.ForegroundColor = ConsoleColor.Green;
68: Console.WriteLine("Received Order {0} \n\t on {1} \n\t with Message Id {2} \n\t and Lock Token {3}.", recdOrder.OrderId, "Credit Service Subscriber", recdMsg.MessageId, recdMsg.LockToken);
69:
70: creditServiceSubscriber.Close();
71:
72: // Fulfillment Service Subscriber for the North America Fulfillment Service Subscription
73: SubscriptionClient northAmericaFulfillmentServiceSubscriber = messagingFactory.CreateSubscriptionClient(topicName, "northAmericaFulfillmentServiceSubscription");
74:
75: // Read the message from the OrdersTopic for the North America Fulfillment Service Subscription
76: recdMsg = northAmericaFulfillmentServiceSubscriber.Receive(TimeSpan.FromSeconds(5));
77:
78:
79: if(recdMsg != null)
80: {
81: // Convert from BrokeredMessage to native Order
82: recdOrder = recdMsg.GetBody<Order>();
83:
84: // Complete read, release and delete message from the fabric
85: northAmericaFulfillmentServiceSubscriber.Complete(recdMsg.LockToken);
86:
87: Console.ForegroundColor = ConsoleColor.Green;
88: Console.WriteLine("Received Order {0} \n\t on {1} \n\t with Message Id {2} \n\t and Lock Token {3}.", recdOrder.OrderId, "North America Fulfillment Service Subscriber", recdMsg.MessageId, recdMsg.LockToken);
89: }
90: else
91: {
92: Console.ForegroundColor = ConsoleColor.Yellow;
93: Console.WriteLine("No messages for North America found.");
94: }
95:
96: northAmericaFulfillmentServiceSubscriber.Close();
When running this sample, you’ll see that I have received Order 42 on my Inventory, Credit and North America Fulfillment Service subscriptions:
WCF
One of the great things about the WCF programming model is that it abstracts much of the underlying communication details and as such, other than dropping in a new assembly and and refactoring the binding and configuration, it is not greatly affected by the API changes from the May/June CTP to GA.
As I mentioned, one thing that has changed is that the ServiceBusMessagingBinding has been renamed to NetMessagingBinding. I’ll be covering and end to end example of using the NetMessagingBinding in my upcoming article in CODE Magazine.
REST API
The REST API is key to delivering these new capabilities across a variety of client platforms and remains largely unchanged, however one key change is how message properties are handled. Instead of individual headers for each, there is now one header with all the properties JSON encoded. Please refer to the updated REST API Reference doc for details. I’ll also be covering and end-to-end example of using the REST API to write an read to/from a queue in my upcoming article in CODE Magazine.
More Coming Soon
As I mentioned, in my upcoming article in CODE Magazine, I’ll cover the Why, What, and How behind Azure AppFabric Service Bus Brokered Messaging including end to end walkthroughs with the .NET Client API, REST API and WCF Binding. The November/December issue should be on newsstands (including Barnes and Noble) or your mailbox towards the end of October. You can also find the article online at http://code-magazine.com
Resources
You can learn more about this exciting release as well as download the GA SDK version 1.5 by visiting the following resources: