Here’s how I once implemented an Azure Topic subscription:
namespace OrderRequest.DataGateway open System open System.Text open System.Linq open System.Diagnostics open System.Threading.Tasks open System.Threading open Microsoft.Azure.ServiceBus open Microsoft.Azure.ServiceBus.Management open Newtonsoft.Json open OrderRequest.Core open OrderRequest.Specification.DataTransfer type AzureTopicListener(subscriptionInfo:SubscriptionInfo) as x = let topic = subscriptionInfo.Topic let subscription = subscriptionInfo.Subscription let connectionString = subscriptionInfo.ConnectionString let filterOption = subscriptionInfo.Filter static let mutable flag = false let requested = Event() let mutable subscriptionClient : SubscriptionClient = null let ruleNameOption = filterOption |> function | None -> None | Some filter -> Some function | None -> None | Some filter -> ruleNameOption |> function | None -> None | Some name -> Some Async.AwaitTask let hasRule = ruleNameOption |> function | None -> false | Some ruleName -> rulesFound.Any(fun r -> r.Name = ruleName) if not (hasRule) then ruleDescriptionOption |> function | None -> () | Some ruleDescription -> async { let subscriptionDescription = SubscriptionDescription(topic, subscription) do! managementClient.CreateSubscriptionAsync(subscriptionDescription, ruleDescription) |> Async.AwaitTask |> Async.Ignore } |> Async.RunSynchronously with ex -> Debug.WriteLine(ex.GetBaseException().Message) } let exceptionReceivedHandler (args:ExceptionReceivedEventArgs) = Debug.WriteLine(args.Exception.GetBaseException().Message) Task.CompletedTask let processMessageAsync (message:Message) (_:CancellationToken) = try let json = Encoding.UTF8.GetString(message.Body) subscriptionClient.CompleteAsync(message.SystemProperties.LockToken) |> Async.AwaitTask |> Async.RunSynchronously let payload = JsonConvert.DeserializeObject(json) (x :> IEventNotifier).Notify payload Task.CompletedTask with ex -> async { Debug.WriteLine(ex.GetBaseException().Message) return Task.CompletedTask } |> Async.RunSynchronously interface IEventNotifier with member x.IsEnabled with get() = subscriptionClient null && not (subscriptionClient.IsClosedOrClosing) member x.Notify(order) = requested.Trigger order member x.EnableAsync() = async { try if not (flag) then flag <- true do! createSubscriptionFilterAsync() subscriptionClient <- new SubscriptionClient(connectionString, topic, subscription) subscriptionClient.OperationTimeout Async.AwaitTask let hasDefaultRule = rulesFound.Any(fun r -> r.Name = RuleDescription.DefaultRuleName) if hasDefaultRule then do! subscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName) |> Async.AwaitTask ruleNameOption |> function | None -> () | Some ruleName -> let alreadyHasRule = rulesFound.Any(fun r -> r.Name = ruleName) if not (alreadyHasRule) then ruleDescriptionOption |> function | None -> () | Some ruleDescription -> async { do! subscriptionClient.AddRuleAsync(ruleDescription) |> Async.AwaitTask } |> Async.Start let msgOptions = MessageHandlerOptions(fun args -> exceptionReceivedHandler(args)) msgOptions.AutoComplete <- false msgOptions.MaxAutoRenewDuration <- TimeSpan.FromMinutes(1.0) msgOptions.MaxConcurrentCalls Async.RunSynchronously with ex -> Debug.WriteLine(ex.GetBaseException().Message) } |> Async.StartAsTask member x.DisableAsync() = async { try if subscriptionClient.IsClosedOrClosing then do! subscriptionClient.CloseAsync() |> Async.AwaitTask flag Debug.WriteLine(ex.GetBaseException().Message) } |> Async.StartAsTask [] member x.Notification = requested.Publish