Xamarin: Building a Real-time Listener for an Azure Topic

Recently, I had to implement an Azure Topic subscription within my Xamarin app.


The code below is what I built to get my Xamarin app to subscribe to Azure topic that relies on a filter rule:

namespace OrderRequest.DataGateway

open System
open System.Linq
open System.Diagnostics
open System.Threading.Tasks
open System.Threading
open System.Text
open Microsoft.Azure.ServiceBus
open Microsoft.Azure.ServiceBus.Management
open Newtonsoft.Json
open OrderRequest.Core

type SubscriptionInfo = {
    Topic            : string
    Subscription     : string
    connectionString : string

type Filter = { Name:string; Value:string }

type AzureTopicListener(subscriptionInfo:SubscriptionInfo, filter:Filter) as x =
    let topic            = subscriptionInfo.Topic
    let subscription     = subscriptionInfo.Subscription
    let connectionString = subscriptionInfo.connectionString

    static let mutable flag = false

    let requested = Event()

    let mutable subscriptionClient : SubscriptionClient = null

    let ruleName = sprintf "Filter.%s" filter.Name

    let ruleDescription =

        let courierIdFilter      = sprintf "%s(%s)" filter.Name filter.Value
        let courierIdLabelFilter = SqlFilter(sprintf "sys.Label='%s'" courierIdFilter)

        RuleDescription(ruleName, courierIdLabelFilter)

    let createSubscriptionFilterAsync() =

        async {
                let  managementClient = ManagementClient(connectionString)
                let! rulesFound       = managementClient.GetRulesAsync(topic,subscription) |> Async.AwaitTask
                let hasRule           = rulesFound.Any(fun r -> r.Name = ruleName)

                if not (hasRule) then 

                    let subscriptionDescription = SubscriptionDescription(topic, subscription)
                    do! managementClient.CreateSubscriptionAsync(subscriptionDescription, ruleDescription) |> Async.AwaitTask |> Async.Ignore
                ex -> Debug.WriteLine(ex.GetBaseException().Message)

    let exceptionReceivedHandler (args:ExceptionReceivedEventArgs) =


    let processMessageAsync (message:Message) (_:CancellationToken) = 
            let json = Encoding.UTF8.GetString(message.Body)
            subscriptionClient.CompleteAsync(message.SystemProperties.LockToken) |> Async.AwaitTask |> Async.RunSynchronously

            let courierResponse = JsonConvert.DeserializeObject(json)

            (x :> IEventNotifier).Notify courierResponse


            ex -> 
                async {

                    return Task.CompletedTask

                } |> Async.RunSynchronously

    interface IEventNotifier with

        member x.IsEnabled with get() = subscriptionClient  null && not (subscriptionClient.IsClosedOrClosing)

        member x.Notify(order) = requested.Trigger order

        member x.EnableAsync () =
            async {

                    if not (flag) then
                        flag <- true

                        do! createSubscriptionFilterAsync()
                        subscriptionClient <- new SubscriptionClient(connectionString, topic, subscription)
                        subscriptionClient.OperationTimeout  Async.AwaitTask
                        let  hasDefaultRule = rulesFound.Any(fun r -> r.Name = RuleDescription.DefaultRuleName)

                        if hasDefaultRule then
                            do! subscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName) |> Async.AwaitTask

                            let alreadyHasRule = rulesFound.Any(fun r -> r.Name = ruleName)

                            if not (alreadyHasRule) then
                                do! subscriptionClient.AddRuleAsync(ruleDescription) |> Async.AwaitTask

                            let msgOptions = MessageHandlerOptions(fun args -> exceptionReceivedHandler(args))
                            msgOptions.AutoComplete         <- false
                            msgOptions.MaxAutoRenewDuration <- TimeSpan.FromMinutes(1.0)
                            msgOptions.MaxConcurrentCalls    Debug.WriteLine(ex.GetBaseException().Message)

            } |> Async.StartAsTask

        member x.DisableAsync() =

            async { 
                    if subscriptionClient.IsClosedOrClosing then
                        do! subscriptionClient.CloseAsync() |> Async.AwaitTask

                    flag  Debug.WriteLine(ex.GetBaseException().Message)

            } |> Async.StartAsTask

    member x.IncomingRequest = requested.Publish


Here’s a test that I wrote:

module OrderRequest.Tests.RequestListener

open NUnit.Framework
open OrderRequest.DataGateway
open OrderRequest.Core
open OrderRequest.Specification

let ``run Azure TopicListener``() =

    // Setup
    let filter = {Name="courier-id"; Value="b965f552-31a4-4644-a9c6-d86dd45314c4"}

    let subscriptionInfo = {
        Topic            = "Topic.courier-responded"
        Subscription     = "Subscription.all-messages"
        connectionString = "some_connection_string"

    let topicListener = AzureTopicListener(subscriptionInfo, filter) :> IEventNotifier

    // Test
    async {
        do! topicListener.EnableAsync() |> Async.AwaitTask

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: