Azure Topic Subscription using Filter

Here’s how I once implemented an Azure Topic subscription:

namespace OrderRequest.DataGateway

open System
open System.Text
open System.Linq
open System.Diagnostics
open System.Threading.Tasks
open System.Threading
open Microsoft.Azure.ServiceBus
open Microsoft.Azure.ServiceBus.Management
open Newtonsoft.Json
open OrderRequest.Core
open OrderRequest.Specification.DataTransfer

type AzureTopicListener(subscriptionInfo:SubscriptionInfo) as x =
    
    let topic            = subscriptionInfo.Topic
    let subscription     = subscriptionInfo.Subscription
    let connectionString = subscriptionInfo.ConnectionString
    let filterOption     = subscriptionInfo.Filter

    static let mutable flag = false

    let requested = Event()

    let mutable subscriptionClient : SubscriptionClient = null

    let ruleNameOption = filterOption |> function
        | None        -> None
        | Some filter -> Some  function
        | None -> None
        | Some filter ->

            ruleNameOption |> function
            | None      -> None
            | Some name -> Some  Async.AwaitTask

                let  hasRule = ruleNameOption |> function
                               | None -> false
                               | Some ruleName -> rulesFound.Any(fun r -> r.Name = ruleName)

                if not (hasRule) then

                    ruleDescriptionOption 
                    |> function
                       | None -> ()
                       | Some ruleDescription ->

                            async {
                        
                                let subscriptionDescription = SubscriptionDescription(topic, subscription)
                                do! managementClient.CreateSubscriptionAsync(subscriptionDescription, ruleDescription) |> Async.AwaitTask |> Async.Ignore

                            } |> Async.RunSynchronously
            
            with
                ex -> Debug.WriteLine(ex.GetBaseException().Message)
        }

    let exceptionReceivedHandler (args:ExceptionReceivedEventArgs) =

        Debug.WriteLine(args.Exception.GetBaseException().Message)
        Task.CompletedTask

    let processMessageAsync (message:Message) (_:CancellationToken) = 
    
        try

            let json = Encoding.UTF8.GetString(message.Body)
            subscriptionClient.CompleteAsync(message.SystemProperties.LockToken) |> Async.AwaitTask |> Async.RunSynchronously

            let payload = JsonConvert.DeserializeObject(json)

            (x :> IEventNotifier).Notify payload

            Task.CompletedTask

        with
            ex -> 
                async {

                    Debug.WriteLine(ex.GetBaseException().Message)
                    return Task.CompletedTask

                } |> Async.RunSynchronously

    interface IEventNotifier with

        member x.IsEnabled with get() = subscriptionClient  null && not (subscriptionClient.IsClosedOrClosing)

        member x.Notify(order) = requested.Trigger order

        member x.EnableAsync() =
        
            async {

                try
                    if not (flag) then
                        
                        flag <- true

                        do! createSubscriptionFilterAsync()
                        
                        subscriptionClient <- new SubscriptionClient(connectionString, topic, subscription)
                        subscriptionClient.OperationTimeout  Async.AwaitTask
                            let  hasDefaultRule = rulesFound.Any(fun r -> r.Name = RuleDescription.DefaultRuleName)

                            if hasDefaultRule then
                                do! subscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName) |> Async.AwaitTask

                            ruleNameOption |> function
                            | None -> ()
                            | Some ruleName ->

                                let alreadyHasRule = rulesFound.Any(fun r -> r.Name = ruleName)

                                if not (alreadyHasRule) then

                                    ruleDescriptionOption |> function
                                    | None -> ()
                                    | Some ruleDescription ->

                                        async {
                                            do! subscriptionClient.AddRuleAsync(ruleDescription) |> Async.AwaitTask
                                        } |> Async.Start

                            let msgOptions = MessageHandlerOptions(fun args -> exceptionReceivedHandler(args))
                            msgOptions.AutoComplete         <- false
                            msgOptions.MaxAutoRenewDuration <- TimeSpan.FromMinutes(1.0)
                            msgOptions.MaxConcurrentCalls    Async.RunSynchronously

                with 
                    ex -> Debug.WriteLine(ex.GetBaseException().Message)

            } |> Async.StartAsTask

        member x.DisableAsync() =

            async { 
            
                try
                    if subscriptionClient.IsClosedOrClosing then
                        do! subscriptionClient.CloseAsync() |> Async.AwaitTask

                    flag  Debug.WriteLine(ex.GetBaseException().Message)

            } |> Async.StartAsTask

    []
    member x.Notification = requested.Publish

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s