Azure Service Bus Brokered Messaging provides durable pull-based pub-sub, complimenting it’s older sibling Relay Messaging which uses a push messaging model. While both enable hybrid composition across traditional business, trust and network boundaries, they provide unique capabilities in and of themselves.
As with Relay Messaging, Brokered Messaging provides first class support for WCF with the NetMessagingBinding, but expands the developer surface to general .NET and cross-platform/mobility scenarios by offering the .NET Client and REST API respectively.
Of the 3 APIs, the .NET Client API is the most robust and seems to be the most documented.
The simplicity of the WCF programming model (the illusion that messages are being pushed to your endpoint) is balanced with some restrictions that naturally fall out of the scope of one-way messaging including queue/topic/subscription/rule creation and support for peek lock.
In this regard, while not as robust as the .NET Client API, the REST API offers a more comprehensive feature set and when working on solutions that must be interoperable across client platforms or due to other restrictions, the REST API is a great choice.
Please note that my goal is not to be elegant or use the tersest or most fluid syntax possible in this samples, but rather to get some quick and dirty examples out there, well, quickly.
As such, the unit tests should be self explanatory, but if you have any questions, please don’t hesitate to ask.
1: using System;
2: using System.Text;
3: using System.Collections.Generic;
4: using System.Linq;
5: using Microsoft.VisualStudio.TestTools.UnitTesting;
6: using System.Collections.Specialized;
7: using System.Net;
8: using System.Runtime.Serialization.Json;
9: using System.Runtime.Serialization;
10: using System.IO;
11:
12: namespace RESTAPITests
13: {
14: [TestClass]
15: public class RESTAPITests
16: {
17:
18: static string serviceNamespace = "[NAMESPACE]";
19: static string issuerName = "owner";
20: static string issuerSecret = "[KEY]";
21: const string sbHostName = "servicebus.windows.net";
22: const string acsHostName = "accesscontrol.windows.net";
23: static string relativeAddress = "[Queue]";
24: static string baseAddress;
25:
26:
27: [TestMethod]
28: public void SendMessageShouldSucceedWithoutError()
29: {
30: string body = "foo";
31:
32: var token = GetToken(issuerName, issuerSecret);
33:
34: baseAddress = GetBaseAddress();
35: string fullAddress = baseAddress + relativeAddress + "/messages";
36:
37: WebClient webClient = new WebClient();
38: webClient.Headers[HttpRequestHeader.Authorization] = token;
39: webClient.UploadData(fullAddress, "POST", Encoding.UTF8.GetBytes(body));
40:
41: }
42:
43: [TestMethod]
44: public void PeekLockMessageShouldReturnLockId()
45: {
46: var token = GetToken(issuerName, issuerSecret);
47:
48: baseAddress = GetBaseAddress();
49:
50: // Read and lock the message. Unless released, the lock will expire within the configured lock duration (on the queue)
51: string fullAddress = baseAddress + relativeAddress + "/messages/head";
52:
53: WebClient webClient = new WebClient();
54: webClient.Headers[HttpRequestHeader.Authorization] = token;
55: webClient.UploadData(fullAddress, "POST", new byte[0]{});
56:
57: var props = webClient.ResponseHeaders["BrokerProperties"];
58:
59: // Deserialize the JSON header to a simple class
60: DataContractJsonSerializer serializer = new DataContractJsonSerializer(typeof(BrokerProperty));
61: using (MemoryStream stream = new MemoryStream(Encoding.Unicode.GetBytes(props)))
62: {
63: var result = (BrokerProperty)serializer.ReadObject(stream);
64:
65: Assert.IsNotNull(result.LockToken);
66: }
67:
68: }
69:
70: [TestMethod]
71: public void PeekLockMessageAndAbandonShouldSucceed()
72: {
73: var token = GetToken(issuerName, issuerSecret);
74:
75: baseAddress = GetBaseAddress();
76:
77: // Read and lock the message. Unless released, the lock will expire within the configured lock duration (on the queue)
78: string fullAddress = baseAddress + relativeAddress + "/messages/head";
79:
80: WebClient webClient = new WebClient();
81: webClient.Headers[HttpRequestHeader.Authorization] = token;
82: webClient.UploadData(fullAddress, "POST", new byte[0] { });
83:
84: var props = webClient.ResponseHeaders["BrokerProperties"];
85:
86: // Deserialize the JSON header to a simple class
87: DataContractJsonSerializer serializer = new DataContractJsonSerializer(typeof(BrokerProperty));
88:
89: using (MemoryStream stream = new MemoryStream(Encoding.Unicode.GetBytes(props)))
90: {
91: var result = (BrokerProperty)serializer.ReadObject(stream);
92:
93: // Bail on the message, release the lock so it is available for another consumer
94: fullAddress = baseAddress + relativeAddress + String.Format("/messages/{0}/{1}", result.MessageId, result.LockToken);
95: }
96:
97: webClient = new WebClient();
98: webClient.Headers[HttpRequestHeader.Authorization] = token;
99: webClient.UploadData(fullAddress, "PUT", new byte[0] { });
100:
101: }
102:
103: [TestMethod]
104: public void PeekLockMessageAndCompleteShouldSucceed()
105: {
106: var token = GetToken(issuerName, issuerSecret);
107:
108: baseAddress = GetBaseAddress();
109:
110: // Peek lock the message
111: string fullAddress = baseAddress + relativeAddress + "/messages/head";
112:
113: WebClient webClient = new WebClient();
114: webClient.Headers[HttpRequestHeader.Authorization] = token;
115: webClient.UploadData(fullAddress, "POST", new byte[0] { });
116:
117: var props = webClient.ResponseHeaders["BrokerProperties"];
118:
119: DataContractJsonSerializer serializer = new DataContractJsonSerializer(typeof(BrokerProperty));
120:
121: using (MemoryStream stream = new MemoryStream(Encoding.Unicode.GetBytes(props)))
122: {
123: var result = (BrokerProperty)serializer.ReadObject(stream);
124:
125: // Complete the read operation, releasing the lock and deleting the message from the queue
126: fullAddress = baseAddress + relativeAddress + String.Format("/messages/{0}/{1}", result.MessageId, result.LockToken);
127: }
128: webClient = new WebClient();
129: webClient.Headers[HttpRequestHeader.Authorization] = token;
130: webClient.UploadData(fullAddress, "DELETE", new byte[0] { });
131: }
132:
133: [TestMethod]
134: public void DecodeJsonToTypeShouldAllowEasyExtractionOfProps()
135: {
136:
137: string payload = @"{""DeliveryCount"":3,""LockToken"":""4a1d4c96-9837-42a9-ad91-3ecf704eec40"",""LockedUntilUtc"":""Thu, 19 Jan 2012 01:22:44 GMT"",
""MessageId"":""4a4fa2c7d87a40a7b799625b9de69e42"",""SequenceNumber"":2,""TimeToLive"":922337203685}";
138:
139: DataContractJsonSerializer serializer = new DataContractJsonSerializer(typeof(BrokerProperty));
140:
141: using (MemoryStream stream = new MemoryStream(Encoding.Unicode.GetBytes(payload)))
142: {
143: var result = (BrokerProperty)serializer.ReadObject(stream);
144:
145: Assert.IsNotNull(result.MessageId);
146: }
147:
148: }
149:
150: [DataContract]
151: public class BrokerProperty
152: {
153: [DataMember]
154: public string DeliveryCount { get; set; }
155: [DataMember]
156: public string LockToken { get; set; }
157: [DataMember]
158: public string LockedUntilUtc { get; set; }
159: [DataMember]
160: public string MessageId { get; set; }
161: [DataMember]
162: public string SequenceNumber { get; set; }
163: [DataMember]
164: public string TimeToLive { get; set; }
165: }
166:
167:
168: // Helper
169: private static string GetBaseAddress()
170: {
171: return baseAddress = "https://" + serviceNamespace + "." + sbHostName + "/";
172: }
173:
174: // Helper, warmly borrowed from Service Bus Management Sample :-)
175: private static string GetToken(string issuerName, string issuerSecret)
176: {
177: var acsEndpoint = "https://" + serviceNamespace + "-sb." + acsHostName + "/WRAPv0.9/";
178:
179: // Note that the realm used when requesting a token uses the HTTP scheme, even though
180: // calls to the service are always issued over HTTPS
181: var realm = "http://" + serviceNamespace + "." + sbHostName + "/";
182:
183: NameValueCollection values = new NameValueCollection();
184: values.Add("wrap_name", issuerName);
185: values.Add("wrap_password", issuerSecret);
186: values.Add("wrap_scope", realm);
187:
188: WebClient webClient = new WebClient();
189: byte[] response = webClient.UploadValues(acsEndpoint, values);
190:
191: string responseString = Encoding.UTF8.GetString(response);
192:
193: var responseProperties = responseString.Split('&');
194: var tokenProperty = responseProperties[0].Split('=');
195: var token = Uri.UnescapeDataString(tokenProperty[1]);
196:
197: return "WRAP access_token=\"" + token + "\"";
198: }
199:
200: }
201:
202: }