Azure Topic Listener
namespace Notifications.DataGateway
open System
open System.Text
open System.Diagnostics
open System.Threading.Tasks
open Azure.Messaging.ServiceBus
open Newtonsoft.Json
open Notifications.DataTransfer
open Notifications.DataGateway
type AzureTopicListener<'T>(subscriptionInfo:SubscriptionInfo) as x =
let topic = subscriptionInfo.Topic
let subscription = subscriptionInfo.Subscription
let connectionString = subscriptionInfo.ConnectionString
let requested = Event<_>()
let mutable serviceBusClient : ServiceBusClient = null
let mutable processor : ServiceBusProcessor = null
let exceptionReceivedHandler (args:ProcessErrorEventArgs) =
Debug.WriteLine(sprintf "\nERROR: %s\n" <| args.Exception.GetBaseException().Message)
Task.CompletedTask
let processMessageAsync (v:ProcessMessageEventArgs) =
async {
Debug.WriteLine $"\nProcessing message for: {subscriptionInfo.Topic}\n"
try
do! v.CompleteMessageAsync v.Message |> Async.AwaitTask
let json = Encoding.UTF8.GetString(v.Message.Body.ToArray())
let payload = JsonConvert.DeserializeObject<'T>(json)
(x :> IEventNotifier<'T>).Notify payload
return Task.CompletedTask
with ex ->
let error = ex.GetBaseException().Message
Debug.WriteLine(error)
return Task.CompletedTask
} |> Async.RunSynchronously
interface IEventNotifier<'T> with
member x.IsEnabled with get() = serviceBusClient <> null && not (serviceBusClient.IsClosed)
member x.Notify(v) = requested.Trigger v
member x.EnableAsync() =
async {
try
Debug.WriteLine $"\n\nPreparing to subscribe to topic: {topic}\n\n"
Debug.WriteLine $"\n\nConnection: {connectionString}\n\n"
serviceBusClient <- new ServiceBusClient(connectionString)
let msgOptions = ServiceBusProcessorOptions(AutoCompleteMessages = true,
MaxAutoLockRenewalDuration = TimeSpan.FromHours(1.0),
MaxConcurrentCalls = 1)
processor <- serviceBusClient.CreateProcessor(topic, subscription, msgOptions)
processor.add_ProcessMessageAsync(fun v -> processMessageAsync v)
processor.add_ProcessErrorAsync (fun v -> exceptionReceivedHandler v)
do! processor.StartProcessingAsync() |> Async.AwaitTask
Debug.WriteLine <| $"\n\nSubscribed to topic: {topic}\n\n"
with
ex -> Debug.WriteLine(ex.GetBaseException().Message)
Debug.WriteLine $"\n\nConnection: {connectionString}\n\n"
} |> Async.StartAsTask
member x.DisableAsync() =
async {
Debug.WriteLine $"\n\nDisabling subscription for topic: {topic}\n\n"
try
if processor <> null &&
not (processor.IsClosed) then
let! result =
Async.Catch(
async { serviceBusClient.DisposeAsync() |> ignore
processor.CloseAsync() |> ignore
Debug.WriteLine $"\n\nDisabled subscription for topic: {topic}\n\n"
})
match result with
| Choice1Of2 () -> ()
| Choice2Of2 ex -> Debug.WriteLine($"\n\n{ex.StackTrace.ToString()}\n\n")
with ex ->
let error = ex.StackTrace.ToString()
Debug.WriteLine($"\n\n{error}\n\n")
} |> Async.StartAsTask
[<CLIEvent>]
member x.Notification = requested.Publish
IEventNotifier
namespace Notifications.DataGateway
open System.Threading.Tasks
type IEventNotifier<'a> =
abstract EnableAsync : unit -> Task<unit>
abstract DisableAsync : unit -> Task<unit>
abstract Notify<'a> : 'a -> unit
abstract IsEnabled : bool
Notification Infrastructure
namespace Notifications
module DataTransfer =
type Filter = { OnLabel : string }
type SubscriptionInfo = {
Topic : string
Subscription : string
ConnectionString : string
Filter : Filter option
}
Plumbing
namespace rec OrderRequest
type CouriersFoundDelegate = delegate of obj * DataTransfer.CouriersFound -> unit
let couriersFound (requestId:string) = {
Topic = createTopicId "Topic.couriers-found" (Guid.Parse(requestId))
Subscription = CustomerApp
ConnectionString = ServiceBusConnection.Instance.ConnectionString
Filter = None
}
Subscription Client
let couriersFound = Event<CouriersFoundDelegate,DataTransfer.CouriersFound>()
let couriersFoundListener = AzureTopicListener<DataTransfer.CouriersFound>(For.couriersFound requestId)
let couriersFoundNotifier = couriersFoundListener :> IEventNotifier<DataTransfer.CouriersFound>
do couriersFoundListener.Notification.Add(fun v -> couriersFound.Trigger(this, eventData)
...
)
Like this:
Like Loading...