The following code invokes an Azure topics generator:
using System.Threading.Tasks; using Microsoft.Azure.WebJobs; using Microsoft.Extensions.Logging; using static System.Environment; namespace Courier.ServiceBus.Functions { public static class CourierAcceptedFn { [FunctionName(nameof(CourierAcceptedFn))] public static async Task Run([ServiceBusTrigger("Topic.courier-accepted", "Subscription.all-messages", Connection = "ServiceBusConnectionKey")]string json, ILogger log) { var request = await nameof(CourierAcceptedFn).Update(json, "courier-accepted", log); var connectionKey = GetEnvironmentVariable("StorageConnectionString"); await request.GenerateTopics(connectionKey, log); } } }
Here’s the topics generator:
namespace Courier.ServiceBus { internal static class TopicsCreator { public static async Task GenerateTopics(this Request request, string connectionKey, ILogger log) { await request.CreateTopic("Topic.courier-eta" , connectionKey, log); await request.CreateTopic("Topic.customer-cancelled" , connectionKey, log); await request.CreateTopic("Topic.courier-arrived-at-pickup" , connectionKey, log); await request.CreateTopic("Topic.courier-enroute-to-dropoff", connectionKey, log); await request.CreateTopic("Topic.courier-arrived-at-dropoff", connectionKey, log); await request.CreateTopic("Topic.courier-dropped-off" , connectionKey, log); await request.CreateTopic("Topic.courier-arrived-at-pickup" , connectionKey, log); } static async Task CreateTopic(this Request request, string topicPrefix, string connectionKey, ILogger log) { try { var topicName = createTopicId(topicPrefix, request.RequestId); var result = await AzureTopic.createAsync(topicName, connectionKey); if (result.IsError) log.LogError($"{nameof(CreateTopic)} failed:\n{result.ErrorValue}"); } catch (Exception ex) { log.LogError($"{topicPrefix} failed:\n{ex.GetBaseException().Message}"); } } } }
Here’s the Azure topic CRUD operations:
namespace Courier open System.Threading.Tasks open Microsoft.Azure.ServiceBus.Management module AzureTopic = let existsAsync(topicName:string) (connectionstring:string) : Task = async { try let client = ManagementClient(connectionstring) let! result = client.TopicExistsAsync(topicName) |> Async.AwaitTask return Ok result with ex -> return Error Async.StartAsTask let createAsync(topicName:string) (connectionstring:string) : Task = async { try let client = ManagementClient(connectionstring) let! found = client.TopicExistsAsync(topicName) |> Async.AwaitTask if not found then let description = TopicDescription(topicName, EnablePartitioning= true) do! client.CreateTopicAsync(description) |> Async.AwaitTask |> Async.Ignore return Ok () else return Error return Error Async.StartAsTask let deleteAsync(topicName:string) (connectionstring:string) : Task = async { try let client = ManagementClient(connectionstring) let! found = client.TopicExistsAsync(topicName) |> Async.AwaitTask if found then do! client.DeleteTopicAsync(topicName) |> Async.AwaitTask return Ok () else return Error return Error Async.StartAsTask