F#: Azure Service Bus (Topic Subscription)

Azure Topic Listener

namespace Notifications.DataGateway

open System
open System.Text
open System.Diagnostics
open System.Threading.Tasks
open Azure.Messaging.ServiceBus
open Newtonsoft.Json
open Notifications.DataTransfer
open Notifications.DataGateway

type AzureTopicListener<'T>(subscriptionInfo:SubscriptionInfo) as x =
    
    let topic            = subscriptionInfo.Topic
    let subscription     = subscriptionInfo.Subscription
    let connectionString = subscriptionInfo.ConnectionString

    let requested = Event<_>()

    let mutable serviceBusClient : ServiceBusClient    = null
    let mutable processor        : ServiceBusProcessor = null

    let exceptionReceivedHandler (args:ProcessErrorEventArgs) =

        Debug.WriteLine(sprintf "\nERROR: %s\n" <| args.Exception.GetBaseException().Message)
        Task.CompletedTask

    let processMessageAsync (v:ProcessMessageEventArgs) = 
    
        async {

            Debug.WriteLine $"\nProcessing message for: {subscriptionInfo.Topic}\n"

            try
                do! v.CompleteMessageAsync v.Message |> Async.AwaitTask

                let json    = Encoding.UTF8.GetString(v.Message.Body.ToArray())
                let payload = JsonConvert.DeserializeObject<'T>(json)

                (x :> IEventNotifier<'T>).Notify payload

                return Task.CompletedTask

            with ex -> 

                let error = ex.GetBaseException().Message
                Debug.WriteLine(error)
                return Task.CompletedTask

        } |> Async.RunSynchronously

    interface IEventNotifier<'T> with

        member x.IsEnabled with get() = serviceBusClient <> null && not (serviceBusClient.IsClosed)

        member x.Notify(v) = requested.Trigger v

        member x.EnableAsync() =
        
            async {

                try
                    Debug.WriteLine $"\n\nPreparing to subscribe to topic: {topic}\n\n"
                    Debug.WriteLine $"\n\nConnection: {connectionString}\n\n"

                    serviceBusClient <- new ServiceBusClient(connectionString)

                    let msgOptions = ServiceBusProcessorOptions(AutoCompleteMessages       = true,
                                                                MaxAutoLockRenewalDuration = TimeSpan.FromHours(1.0),
                                                                MaxConcurrentCalls         = 1)

                    processor <- serviceBusClient.CreateProcessor(topic, subscription, msgOptions) 

                    processor.add_ProcessMessageAsync(fun v -> processMessageAsync      v)
                    processor.add_ProcessErrorAsync  (fun v -> exceptionReceivedHandler v)

                    do! processor.StartProcessingAsync() |> Async.AwaitTask

                    Debug.WriteLine <| $"\n\nSubscribed to topic: {topic}\n\n"

                with 
                    ex -> Debug.WriteLine(ex.GetBaseException().Message)
                          Debug.WriteLine $"\n\nConnection: {connectionString}\n\n"

            } |> Async.StartAsTask

        member x.DisableAsync() =

            async {
            
                Debug.WriteLine $"\n\nDisabling subscription for topic: {topic}\n\n"

                try
                    if processor <> null && 
                        not (processor.IsClosed) then

                        let! result =

                            Async.Catch(
                            
                              async { serviceBusClient.DisposeAsync() |> ignore
                                      processor.CloseAsync()          |> ignore
                                      Debug.WriteLine $"\n\nDisabled subscription for topic: {topic}\n\n" 
                                    })                                      

                        match result with
                        | Choice1Of2 () -> ()
                        | Choice2Of2 ex -> Debug.WriteLine($"\n\n{ex.StackTrace.ToString()}\n\n")

                with ex -> 
                    let error = ex.StackTrace.ToString()
                    Debug.WriteLine($"\n\n{error}\n\n")
            
            } |> Async.StartAsTask

    [<CLIEvent>]
    member x.Notification = requested.Publish

IEventNotifier

namespace Notifications.DataGateway

open System.Threading.Tasks

type IEventNotifier<'a> =

    abstract EnableAsync  : unit -> Task<unit>
    abstract DisableAsync : unit -> Task<unit>
    abstract Notify<'a>   : 'a   -> unit
    abstract IsEnabled    : bool

Notification Infrastructure

namespace Notifications

module DataTransfer =

    type Filter = { OnLabel : string }

    type SubscriptionInfo = {
        Topic            : string
        Subscription     : string
        ConnectionString : string
        Filter           : Filter option
    }

Plumbing

namespace rec OrderRequest

type CouriersFoundDelegate     = delegate of obj * DataTransfer.CouriersFound -> unit
  let couriersFound (requestId:string) = {
        Topic            = createTopicId "Topic.couriers-found" (Guid.Parse(requestId))
        Subscription     = CustomerApp
        ConnectionString = ServiceBusConnection.Instance.ConnectionString
        Filter           = None
    }

Subscription Client

    let couriersFound         = Event<CouriersFoundDelegate,DataTransfer.CouriersFound>()
    let couriersFoundListener = AzureTopicListener<DataTransfer.CouriersFound>(For.couriersFound requestId)
    let couriersFoundNotifier = couriersFoundListener :> IEventNotifier<DataTransfer.CouriersFound>
    do  couriersFoundListener.Notification.Add(fun v -> couriersFound.Trigger(this, eventData)

                                                        ...
                                              )

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: