CosmosDB and Redis are two data stores with different responsibilities. CosmosDB is a NoSQL data store that can serve as an alternative to SQL Server when using its SQL client API. Redis is a data cache technology that is meant to reduce latency, cost, and computing between client and server. The repo for this brain dump can be found on GitHub.
Client Test
The following is an integration test for saving a registration request:
module BillableSubscription.Sync.Tests
open System.Configuration
open NUnit.Framework
open BeachMobile.BillableSubscription.TestAPI.Mock
open BeachMobile.BillableSubscription.DataGateway
open BeachMobile.BillableSubscription.DataGateway.Cosmos
[<Test>]
let ``Sync save registration`` () =
async {
// Setup
Cosmos.ConnectionString.Instance <- ConfigurationManager.AppSettings["cosmosConnectionString"];
Redis. ConnectionString.Instance <- ConfigurationManager.AppSettings["RedisConnectionString"];
// Test
match! someRegistration |> Post.registration |> Async.AwaitTask with
| Error msg -> Assert.Fail msg
| Ok receipt ->
// Verify
match! SyncLogic.Query.status receipt.Registration |> Async.AwaitTask with
| Error msg -> Assert.Fail msg
| Ok status -> Assert.That(status.Value.Registration.Request = someRegistration)
}
Combining CosmosDB with Redis via Query
The query below implements the Cache-Aside pattern. Specifically, the code first checks the cache to satisfy a client request. If no value is found in the cache, then data is fetched from the master data store, which in this case, is CosmosDB.
The Cache-Aside pattern is implemented as follows:
namespace BeachMobile.BillableSubscription.DataGateway.SyncLogic
open BeachMobile.BillableSubscription.Language
open BeachMobile.BillableSubscription.Operations
open BeachMobile.BillableSubscription.DataGateway
module Query =
let status : GetRegistrationStatus =
fun v -> task {
let cache(status:RegistrationStatus) =
async {
match! Redis.Post.registration status with
| Error msg -> return Error msg
| Ok () -> return Ok (Some status.Registration)
}
match! Redis.Get.status v with
| Error msg -> return Error msg
| Ok (Some r) -> return Ok (Some r)
| Ok None ->
match! Cosmos.Get.status v with
| Error msg -> return Error msg
| Ok None -> return Ok None
| Ok (Some r) ->
match! cache r with
| Error msg -> return Error msg
| Ok _ -> return Ok (Some r)
}
let paymentHistory : GetPaymentHistory =
fun v -> task {
let cache items =
async {
match! Redis.Post.paymentHistory { SubscriptionId=v; Payments= items} with
| Error msg -> return Error msg
| Ok () -> return Ok None
}
match! Redis.Get.paymentHistory v with
| Error msg -> return Error msg
| Ok (Some r) -> return Ok (Some r)
| Ok None ->
match! Cosmos.Get.paymentHistory v with
| Error msg -> return Error msg
| Ok None -> return! cache Seq.empty
| Ok (Some r) -> return! cache r
}
CosmosDB Query Logic
CosmosDB relies on a Container to retrieve documents. Accessing a CosmosDB container requires a connection string and a container Id.
The following code exposes container ids and the logic to access a container:
namespace BeachMobile.BillableSubscription.DataGateway.Cosmos
open Microsoft.Azure.Cosmos
open Azure.Identity
type ConnectionString() = static member val Instance = "" with get,set
module Container =
let registration = "Registration"
let registrationStatus = "RegistrationStatus"
let payments = "Payments"
let paymentHistory = "PaymentHistory"
module Database =
let name = "beachmobile-db"
module Container =
let get (db:string) (containerId:string) =
let client = new CosmosClient(ConnectionString.Instance, DefaultAzureCredential())
let database = client.GetDatabase(db)
let container = database.GetContainer(containerId)
container
The following is the logic for querying registration and payment history:
namespace BeachMobile.BillableSubscription.DataGateway.Cosmos
open Microsoft.Azure.Cosmos
open BeachMobile.BillableSubscription.Language
open BeachMobile.BillableSubscription.Operations
open BeachMobile.BillableSubscription.Entities
open BeachMobile.BillableSubscription.DataGateway.Common
open BeachMobile.BillableSubscription.DataGateway.Cosmos
open BeachMobile.BillableSubscription.DataGateway.Cosmos.Database
open BeachMobile.BillableSubscription.DataGateway.Redis
module Get =
let status : GetRegistrationStatus =
fun v -> task {
try
let container = Container.get Database.name Container.registration
match! container.ReadItemAsync<RegistrationStatusEntity>(v.id, PartitionKey(KeyFor.registrationStatus(v.Request.TenantId, v.Request.Plan))) |> Async.AwaitTask with
| response when response.StatusCode = System.Net.HttpStatusCode.OK ->
return Ok (Some response.Resource.Status)
| _ -> return Error "Failed to retrieve registration status"
with ex -> return ex |> toError
}
let paymentHistory : GetPaymentHistory =
fun v -> task {
try
let container = Container.get Database.name Container.paymentHistory
match! container.ReadItemAsync<PaymentHistoryEntity>(v, PartitionKey(Container.paymentHistory)) |> Async.AwaitTask with
| response when response.StatusCode = System.Net.HttpStatusCode.OK ->
return Ok (Some response.Resource.Payments)
| _ -> return Error "Failed to retrieve payment history"
with ex -> return ex |> toError
}
Redis Query Logic
Redis is a key/value store that relies on a server’s memory storage instead of disk storage. Querying cache is as follows:
namespace BeachMobile.BillableSubscription.DataGateway.Redis
open Newtonsoft.Json
open StackExchange.Redis
open BeachMobile.BillableSubscription.Operations
open BeachMobile.BillableSubscription.Language
module Get =
let status : GetRegistrationStatus =
fun v -> task {
try
let! connection = ConnectionMultiplexer.ConnectAsync(ConnectionString.Instance) |> Async.AwaitTask
let cache = connection.GetDatabase()
let receipt = v.Request
let response = cache.StringGet(KeyFor.registrationStatus(receipt.TenantId, receipt.Plan))
match response.HasValue with
| false ->
try
do! connection.CloseAsync() |> Async.AwaitTask
return Ok None
with ex -> return Error (ex.GetBaseException().Message)
| true ->
do! connection.CloseAsync() |> Async.AwaitTask
return Ok (response |> JsonConvert.DeserializeObject<RegistrationStatus> |> Some)
with ex ->
let msg = ex.GetBaseException().Message
return Error msg
}
let paymentHistory : GetPaymentHistory =
fun v -> task {
let! connection = ConnectionMultiplexer.ConnectAsync(ConnectionString.Instance) |> Async.AwaitTask
let cache = connection.GetDatabase()
let response = cache.StringGet(v)
match response.HasValue with
| false ->
do! connection.CloseAsync() |> Async.AwaitTask
return Ok None
| true ->
do! connection.CloseAsync() |> Async.AwaitTask
return Ok (response |> JsonConvert.DeserializeObject<SuccessfulPayment seq> |> Some)
}
CosmosDB Store Logic
namespace BeachMobile.BillableSubscription.DataGateway.Cosmos
open System
open System.Net
open BeachMobile.BillableSubscription.Entities
open BeachMobile.BillableSubscription.Operations
open BeachMobile.BillableSubscription.Language
open BeachMobile.BillableSubscription.DataGateway.Common
open BeachMobile.BillableSubscription.DataGateway.Cosmos
open BeachMobile.BillableSubscription.DataGateway.Cosmos.Database
// CosmosDB Permissions
(*
$resourceGroupName="*****"
$accountName="*****"
$principalId="*****" # Often called Object ID
$dataContributorRoleId="00000000-0000-0000-0000-000000000002"
az cosmosdb sql role assignment create `
--account-name $accountName `
--resource-group $resourceGroupName `
--scope "/" `
--principal-id $principalId `
--role-definition-id $dataContributorRoleId
*)
module Post =
let registration : RequestRegistration =
fun v -> task {
try
let container = Container.get Database.name Container.registration
let item : RegistrationRequestEntity = {
id = Guid.NewGuid() |> string
RegistrationRequest = v
}
let receipt : RegistrationReceipt = {
id = item.id
Request = v
Timestamp = DateTime.UtcNow
}
let status : RegistrationStatus = {
Registration = receipt
Status = "Pending"
Timestamp = DateTime.UtcNow
}
let registration = status.Registration.Request
let status : RegistrationStatusEntity = {
id = item.id
PartitionId = $"{registration.TenantId}:{registration.Plan}"
Status = status
}
match! container.CreateItemAsync<RegistrationStatusEntity>(status, Microsoft.Azure.Cosmos.PartitionKey(status.PartitionId)) |> Async.AwaitTask with
| response when response.StatusCode = HttpStatusCode.Created ->
return Ok { Registration = receipt
Status = "Pending"
Timestamp = receipt.Timestamp
}
| response -> return Error (response.StatusCode.ToString())
with ex -> return ex |> toError
}
let payment : SubmitPayment =
fun v -> task {
try
let container = Container.get Database.name Container.payments
let request : PaymentRequestEntity = {
id = Guid.NewGuid() |> string
PartitionId = v.Subscription.BillablePlan.Plan.Name
PaymentRequest = v
}
match! container.UpsertItemAsync<PaymentRequestEntity>(request) |> Async.AwaitTask with
| response when response.StatusCode = HttpStatusCode.Created ->
return Ok { Payment= v; Timestamp= DateTime.UtcNow }
| response -> return Error (response.StatusCode.ToString())
with ex -> return ex |> toError
}
Redis Store Logic
namespace BeachMobile.BillableSubscription.DataGateway.Redis
open System
open Newtonsoft.Json
open BeachMobile.BillableSubscription.Language
open BeachMobile.BillableSubscription.Operations
open BeachMobile.BillableSubscription.Entities
open BeachMobile.BillableSubscription.DataGateway
open BeachMobile.BillableSubscription.DataGateway.Cosmos
open StackExchange.Redis
// Documentation:
// https://medium.com/@sadigrzazada20/getting-started-with-redis-in-c-using-stackexchange-redis-353a9d65a136
// https://stackoverflow.com/questions/60927540/add-expiry-to-redis-cache
module Post =
let private register(cache:IDatabase) (key:string) (value:string) =
match cache.StringSet(key, value) with
| false -> Error Msg.failedCacheItemRegistration
| true ->
match cache.KeyExpire(key, new TimeSpan(0,0,30)) with
| false -> Error Msg.failedSetexpiration
| true -> Ok ()
let registration (v:RegistrationStatus) =
async {
let! connection = ConnectionMultiplexer.ConnectAsync(Redis.ConnectionString.Instance) |> Async.AwaitTask
let cache = connection.GetDatabase()
let json = JsonConvert.SerializeObject(v)
let receipt = v.Registration.Request
let key = KeyFor.registrationStatus(receipt.TenantId, receipt.Plan)
match register cache key json with
| Error msg ->
do! connection.CloseAsync() |> Async.AwaitTask
return Error msg
| Ok () ->
do! connection.CloseAsync() |> Async.AwaitTask
return Ok()
}
let payment : SubmitPayment =
fun v ->
task {
let! connection = ConnectionMultiplexer.ConnectAsync(ConnectionString.Instance) |> Async.AwaitTask
let cache = connection.GetDatabase()
let data : PaymentRequestEntity = {
id = Guid.NewGuid() |> string
PartitionId = "hello-world"
PaymentRequest = v
}
let json = JsonConvert.SerializeObject(data)
match register cache (KeyFor.payment data.id) json with
| Error msg ->
do! connection.CloseAsync() |> Async.AwaitTask
return Error msg
| Ok () ->
do! connection.CloseAsync() |> Async.AwaitTask
return Ok { Payment= v; Timestamp= DateTime.UtcNow }
}
let paymentHistory (v:PaymentHistory) =
async {
let! connection = ConnectionMultiplexer.ConnectAsync(ConnectionString.Instance) |> Async.AwaitTask
let cache = connection.GetDatabase()
let json = JsonConvert.SerializeObject(v)
match register cache (KeyFor.paymentHistory v.SubscriptionId) json with
| Error msg ->
do! connection.CloseAsync() |> Async.AwaitTask
return Error msg
| Ok () ->
do! connection.CloseAsync() |> Async.AwaitTask
return Ok()
}
Appendix
The solution for this brain dump can be found on GitHub.
Specification
I prefer to declare the core business operations of a service, before implementing them. The code below exposes the core operations:
namespace BeachMobile.BillableSubscription
open System.Threading.Tasks
open Language
module Operations =
type RequestRegistration = RegistrationRequest -> Task<Result<RegistrationStatus ,ErrorDescription>>
type GetRegistrationStatus = RegistrationReceipt -> Task<Result<Option<RegistrationStatus>,ErrorDescription>>
type SubmitPayment = PaymentRequest -> Task<Result<SuccessfulPayment ,ErrorDescription>>
type GetPaymentHistory = SubscriptionId -> Task<Result<option<seq<SuccessfulPayment>>,ErrorDescription>>
CosmosDB Container Configuration
namespace BeachMobile.BillableSubscription.DataGateway.Cosmos
open Microsoft.Azure.Cosmos
open Azure.Identity
type ConnectionString() = static member val Instance = "" with get,set
module Container =
let registration = "Registration"
let registrationStatus = "RegistrationStatus"
let payments = "Payments"
let paymentHistory = "PaymentHistory"
module Database =
let name = "beachmobile-db"
module Container =
let get (db:string) (containerId:string) =
let client = new CosmosClient(ConnectionString.Instance, DefaultAzureCredential())
let database = client.GetDatabase(db)
let container = database.GetContainer(containerId)
container
Redis Cache Configuration
namespace BeachMobile.BillableSubscription.DataGateway.Redis
module KeyFor =
let payment(subscriptionId) = $"Payment:{subscriptionId}"
let registrationStatus(tenantId,plan) = $"{tenantId}:{plan}"
let paymentHistory(subscriptionId) = $"PaymentHistory:{subscriptionId}"