Recently, I had to implement an Azure Topic subscription within my Xamarin app.
AzureTopicListener
The code below is what I built to get my Xamarin app to subscribe to Azure topic that relies on a filter rule:
namespace OrderRequest.DataGateway open System open System.Linq open System.Diagnostics open System.Threading.Tasks open System.Threading open System.Text open Microsoft.Azure.ServiceBus open Microsoft.Azure.ServiceBus.Management open Newtonsoft.Json open OrderRequest.Core type SubscriptionInfo = { Topic : string Subscription : string connectionString : string } type Filter = { Name:string; Value:string } type AzureTopicListener(subscriptionInfo:SubscriptionInfo, filter:Filter) as x = let topic = subscriptionInfo.Topic let subscription = subscriptionInfo.Subscription let connectionString = subscriptionInfo.connectionString static let mutable flag = false let requested = Event() let mutable subscriptionClient : SubscriptionClient = null let ruleName = sprintf "Filter.%s" filter.Name let ruleDescription = let courierIdFilter = sprintf "%s(%s)" filter.Name filter.Value let courierIdLabelFilter = SqlFilter(sprintf "sys.Label='%s'" courierIdFilter) RuleDescription(ruleName, courierIdLabelFilter) let createSubscriptionFilterAsync() = async { try let managementClient = ManagementClient(connectionString) let! rulesFound = managementClient.GetRulesAsync(topic,subscription) |> Async.AwaitTask let hasRule = rulesFound.Any(fun r -> r.Name = ruleName) if not (hasRule) then let subscriptionDescription = SubscriptionDescription(topic, subscription) do! managementClient.CreateSubscriptionAsync(subscriptionDescription, ruleDescription) |> Async.AwaitTask |> Async.Ignore with ex -> Debug.WriteLine(ex.GetBaseException().Message) } let exceptionReceivedHandler (args:ExceptionReceivedEventArgs) = Debug.WriteLine(args.Exception.GetBaseException().Message) Task.CompletedTask let processMessageAsync (message:Message) (_:CancellationToken) = try let json = Encoding.UTF8.GetString(message.Body) subscriptionClient.CompleteAsync(message.SystemProperties.LockToken) |> Async.AwaitTask |> Async.RunSynchronously let courierResponse = JsonConvert.DeserializeObject(json) (x :> IEventNotifier).Notify courierResponse Task.CompletedTask with ex -> async { Debug.WriteLine(ex.GetBaseException().Message) return Task.CompletedTask } |> Async.RunSynchronously interface IEventNotifier with member x.IsEnabled with get() = subscriptionClient null && not (subscriptionClient.IsClosedOrClosing) member x.Notify(order) = requested.Trigger order member x.EnableAsync () = async { try if not (flag) then flag <- true do! createSubscriptionFilterAsync() subscriptionClient <- new SubscriptionClient(connectionString, topic, subscription) subscriptionClient.OperationTimeout Async.AwaitTask let hasDefaultRule = rulesFound.Any(fun r -> r.Name = RuleDescription.DefaultRuleName) if hasDefaultRule then do! subscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName) |> Async.AwaitTask else let alreadyHasRule = rulesFound.Any(fun r -> r.Name = ruleName) if not (alreadyHasRule) then do! subscriptionClient.AddRuleAsync(ruleDescription) |> Async.AwaitTask let msgOptions = MessageHandlerOptions(fun args -> exceptionReceivedHandler(args)) msgOptions.AutoComplete <- false msgOptions.MaxAutoRenewDuration <- TimeSpan.FromMinutes(1.0) msgOptions.MaxConcurrentCalls Debug.WriteLine(ex.GetBaseException().Message) } |> Async.StartAsTask member x.DisableAsync() = async { try if subscriptionClient.IsClosedOrClosing then do! subscriptionClient.CloseAsync() |> Async.AwaitTask flag Debug.WriteLine(ex.GetBaseException().Message) } |> Async.StartAsTask [CLIEvent] member x.IncomingRequest = requested.Publish
Test
Here’s a test that I wrote:
module OrderRequest.Tests.RequestListener open NUnit.Framework open OrderRequest.DataGateway open OrderRequest.Core open OrderRequest.Specification [Test] let ``run Azure TopicListener``() = // Setup let filter = {Name="courier-id"; Value="b965f552-31a4-4644-a9c6-d86dd45314c4"} let subscriptionInfo = { Topic = "Topic.courier-responded" Subscription = "Subscription.all-messages" connectionString = "some_connection_string" } let topicListener = AzureTopicListener(subscriptionInfo, filter) :> IEventNotifier // Test async { do! topicListener.EnableAsync() |> Async.AwaitTask }