Data Retrieval with Azure CosmosDB and Redis

CosmosDB and Redis are two data stores with different responsibilities. CosmosDB is a NoSQL data store that can serve as an alternative to SQL Server when using its SQL client API. Redis is a data cache technology that is meant to reduce latency, cost, and computing between client and server. The repo for this brain dump can be found on GitHub.

Client Test

The following is an integration test for saving a registration request:

module BillableSubscription.Sync.Tests

open System.Configuration
open NUnit.Framework
open BeachMobile.BillableSubscription.TestAPI.Mock
open BeachMobile.BillableSubscription.DataGateway
open BeachMobile.BillableSubscription.DataGateway.Cosmos

[<Test>]
let ``Sync save registration`` () =

    async {
    
        // Setup
        Cosmos.ConnectionString.Instance <- ConfigurationManager.AppSettings["cosmosConnectionString"];
        Redis. ConnectionString.Instance <- ConfigurationManager.AppSettings["RedisConnectionString"];

        // Test
        match! someRegistration |> Post.registration |> Async.AwaitTask with
        | Error msg  -> Assert.Fail msg
        | Ok receipt ->

            // Verify
            match! SyncLogic.Query.status receipt.Registration |> Async.AwaitTask with
            | Error msg -> Assert.Fail msg
            | Ok status -> Assert.That(status.Value.Registration.Request = someRegistration)
    }

Combining CosmosDB with Redis via Query

The query below implements the Cache-Aside pattern. Specifically, the code first checks the cache to satisfy a client request. If no value is found in the cache, then data is fetched from the master data store, which in this case, is CosmosDB.

The Cache-Aside pattern is implemented as follows:

namespace BeachMobile.BillableSubscription.DataGateway.SyncLogic

open BeachMobile.BillableSubscription.Language
open BeachMobile.BillableSubscription.Operations
open BeachMobile.BillableSubscription.DataGateway

module Query =

    let status : GetRegistrationStatus =

        fun v -> task {

            let cache(status:RegistrationStatus) =

                async {
                
                    match! Redis.Post.registration status with
                    | Error msg -> return Error msg
                    | Ok ()     -> return Ok (Some status.Registration)
                }
        
            match! Redis.Get.status v with
            | Error msg   -> return Error msg
            | Ok (Some r) -> return Ok (Some r)
            | Ok None -> 

                match! Cosmos.Get.status v with
                | Error msg   -> return Error msg
                | Ok None     -> return Ok None
                | Ok (Some r) ->

                    match! cache r with
                    | Error msg -> return Error msg
                    | Ok _ -> return Ok (Some r)
        }

    let paymentHistory : GetPaymentHistory = 

        fun v -> task {

            let cache items =

                async {
                
                    match! Redis.Post.paymentHistory { SubscriptionId=v; Payments= items} with
                    | Error msg -> return Error msg
                    | Ok ()     -> return Ok None
                }
        
            match! Redis.Get.paymentHistory v with
            | Error msg   -> return Error msg
            | Ok (Some r) -> return Ok (Some r)
            | Ok None -> 

                match! Cosmos.Get.paymentHistory v with
                | Error msg   -> return Error msg
                | Ok None     -> return! cache Seq.empty
                | Ok (Some r) -> return! cache r
        }

CosmosDB Query Logic

CosmosDB relies on a Container to retrieve documents. Accessing a CosmosDB container requires a connection string and a container Id.

The following code exposes container ids and the logic to access a container:

namespace BeachMobile.BillableSubscription.DataGateway.Cosmos

open Microsoft.Azure.Cosmos
open Azure.Identity

type ConnectionString() = static member val Instance = "" with get,set

module Container =

    let registration       = "Registration"
    let registrationStatus = "RegistrationStatus"
    let payments           = "Payments"
    let paymentHistory     = "PaymentHistory"

module Database =

    let name = "beachmobile-db"

    module Container =

        let get (db:string) (containerId:string) =

            let client    = new CosmosClient(ConnectionString.Instance, DefaultAzureCredential())
            let database  = client.GetDatabase(db)
            let container = database.GetContainer(containerId)
            container

The following is the logic for querying registration and payment history:

namespace BeachMobile.BillableSubscription.DataGateway.Cosmos

open Microsoft.Azure.Cosmos
open BeachMobile.BillableSubscription.Language
open BeachMobile.BillableSubscription.Operations
open BeachMobile.BillableSubscription.Entities
open BeachMobile.BillableSubscription.DataGateway.Common
open BeachMobile.BillableSubscription.DataGateway.Cosmos
open BeachMobile.BillableSubscription.DataGateway.Cosmos.Database
open BeachMobile.BillableSubscription.DataGateway.Redis

module Get =

    let status : GetRegistrationStatus =

        fun v -> task { 

            try
                let container = Container.get Database.name Container.registration

                match! container.ReadItemAsync<RegistrationStatusEntity>(v.id, PartitionKey(KeyFor.registrationStatus(v.Request.TenantId, v.Request.Plan))) |> Async.AwaitTask with
                | response when response.StatusCode = System.Net.HttpStatusCode.OK -> 
                    return Ok (Some response.Resource.Status)
                | _ -> return Error "Failed to retrieve registration status"

            with ex -> return ex |> toError
        }

    let paymentHistory : GetPaymentHistory = 

        fun v -> task {

            try
                let container = Container.get Database.name Container.paymentHistory

                match! container.ReadItemAsync<PaymentHistoryEntity>(v, PartitionKey(Container.paymentHistory)) |> Async.AwaitTask with
                | response when response.StatusCode = System.Net.HttpStatusCode.OK -> 
                    return Ok (Some response.Resource.Payments)
                | _ -> return Error "Failed to retrieve payment history"

            with ex -> return ex |> toError
        }

Redis Query Logic

Redis is a key/value store that relies on a server’s memory storage instead of disk storage. Querying cache is as follows:

namespace BeachMobile.BillableSubscription.DataGateway.Redis

open Newtonsoft.Json
open StackExchange.Redis
open BeachMobile.BillableSubscription.Operations
open BeachMobile.BillableSubscription.Language

module Get =

    let status : GetRegistrationStatus =

        fun v -> task { 

            try
                let! connection = ConnectionMultiplexer.ConnectAsync(ConnectionString.Instance) |> Async.AwaitTask
                let cache       = connection.GetDatabase()
                let receipt     = v.Request
                let response    = cache.StringGet(KeyFor.registrationStatus(receipt.TenantId, receipt.Plan))
            
                match response.HasValue with
                | false -> 
                    try
                        do! connection.CloseAsync() |> Async.AwaitTask
                        return Ok None

                    with ex -> return Error (ex.GetBaseException().Message)

                | true  ->
                    do! connection.CloseAsync() |> Async.AwaitTask
                    return Ok (response |> JsonConvert.DeserializeObject<RegistrationStatus> |> Some)

            with ex ->
                let msg = ex.GetBaseException().Message
                return Error msg
        }

    let paymentHistory : GetPaymentHistory = 

        fun v -> task { 

            let! connection = ConnectionMultiplexer.ConnectAsync(ConnectionString.Instance) |> Async.AwaitTask
            let cache    = connection.GetDatabase()
            let response = cache.StringGet(v)
            
            match response.HasValue with
            | false ->
                do! connection.CloseAsync() |> Async.AwaitTask
                return Ok None

            | true  ->
                do! connection.CloseAsync() |> Async.AwaitTask
                return Ok (response |> JsonConvert.DeserializeObject<SuccessfulPayment seq> |> Some)
        }

CosmosDB Store Logic

namespace BeachMobile.BillableSubscription.DataGateway.Cosmos

open System
open System.Net
open BeachMobile.BillableSubscription.Entities
open BeachMobile.BillableSubscription.Operations
open BeachMobile.BillableSubscription.Language
open BeachMobile.BillableSubscription.DataGateway.Common
open BeachMobile.BillableSubscription.DataGateway.Cosmos
open BeachMobile.BillableSubscription.DataGateway.Cosmos.Database

// CosmosDB Permissions
(*
    $resourceGroupName="*****"
    $accountName="*****"
    $principalId="*****" # Often called Object ID
    $dataContributorRoleId="00000000-0000-0000-0000-000000000002"
    az cosmosdb sql role assignment create `
      --account-name $accountName `
      --resource-group $resourceGroupName `
      --scope "/" `
      --principal-id $principalId `
      --role-definition-id $dataContributorRoleId
*)

module Post =

    let registration : RequestRegistration = 
    
        fun v -> task {
            
            try
                let container = Container.get Database.name Container.registration

                let item : RegistrationRequestEntity = {
                    id = Guid.NewGuid() |> string
                    RegistrationRequest = v
                }

                let receipt : RegistrationReceipt = {
                    id = item.id
                    Request   = v
                    Timestamp = DateTime.UtcNow
                }

                let status : RegistrationStatus = {
                    Registration = receipt
                    Status       = "Pending"
                    Timestamp    = DateTime.UtcNow
                }

                let registration = status.Registration.Request

                let status : RegistrationStatusEntity = {
                    id = item.id
                    PartitionId = $"{registration.TenantId}:{registration.Plan}"
                    Status = status
                }

                match! container.CreateItemAsync<RegistrationStatusEntity>(status, Microsoft.Azure.Cosmos.PartitionKey(status.PartitionId)) |> Async.AwaitTask with
                | response when response.StatusCode = HttpStatusCode.Created ->
                        
                    return Ok { Registration = receipt
                                Status       = "Pending"
                                Timestamp    = receipt.Timestamp
                              }

                | response -> return Error (response.StatusCode.ToString())

            with ex -> return ex |> toError     
        }

    let payment : SubmitPayment = 
    
        fun v -> task {
            
            try
                let container = Container.get Database.name Container.payments

                let request : PaymentRequestEntity = {
                    id = Guid.NewGuid() |> string
                    PartitionId = v.Subscription.BillablePlan.Plan.Name
                    PaymentRequest = v
                }

                match! container.UpsertItemAsync<PaymentRequestEntity>(request) |> Async.AwaitTask with
                | response when response.StatusCode = HttpStatusCode.Created -> 
                    return Ok { Payment= v; Timestamp= DateTime.UtcNow }

                | response -> return Error (response.StatusCode.ToString())
            
            with ex -> return ex |> toError
        }

Redis Store Logic

namespace BeachMobile.BillableSubscription.DataGateway.Redis

open System
open Newtonsoft.Json
open BeachMobile.BillableSubscription.Language
open BeachMobile.BillableSubscription.Operations
open BeachMobile.BillableSubscription.Entities
open BeachMobile.BillableSubscription.DataGateway
open BeachMobile.BillableSubscription.DataGateway.Cosmos
open StackExchange.Redis

// Documentation:
// https://medium.com/@sadigrzazada20/getting-started-with-redis-in-c-using-stackexchange-redis-353a9d65a136
// https://stackoverflow.com/questions/60927540/add-expiry-to-redis-cache

module Post =

    let private register(cache:IDatabase) (key:string) (value:string) =

        match cache.StringSet(key, value) with
        | false -> Error Msg.failedCacheItemRegistration
        | true  -> 

            match cache.KeyExpire(key, new TimeSpan(0,0,30)) with
            | false -> Error Msg.failedSetexpiration
            | true  -> Ok ()

    let registration (v:RegistrationStatus) =

        async {
        
            let! connection = ConnectionMultiplexer.ConnectAsync(Redis.ConnectionString.Instance) |> Async.AwaitTask
            let cache = connection.GetDatabase()
                
            let json    = JsonConvert.SerializeObject(v)
            let receipt = v.Registration.Request
            let key     = KeyFor.registrationStatus(receipt.TenantId, receipt.Plan)

            match register cache key json with
            | Error msg -> 
                do! connection.CloseAsync() |> Async.AwaitTask
                return Error msg

            | Ok () ->

                do! connection.CloseAsync() |> Async.AwaitTask
                    
                return Ok()
            }

    let payment : SubmitPayment = 
    
        fun v ->
            task {
        
                let! connection = ConnectionMultiplexer.ConnectAsync(ConnectionString.Instance) |> Async.AwaitTask
                let cache = connection.GetDatabase()

                let data : PaymentRequestEntity = {
                    id = Guid.NewGuid() |> string
                    PartitionId = "hello-world"
                    PaymentRequest = v
                }

                let json = JsonConvert.SerializeObject(data)
                
                match register cache (KeyFor.payment data.id) json with
                | Error msg -> 
                    do! connection.CloseAsync() |> Async.AwaitTask
                    return Error msg

                | Ok () ->
                    do! connection.CloseAsync() |> Async.AwaitTask
                    return Ok { Payment= v; Timestamp= DateTime.UtcNow }
            }

    let paymentHistory (v:PaymentHistory) = 
    
        async {
        
            let! connection = ConnectionMultiplexer.ConnectAsync(ConnectionString.Instance) |> Async.AwaitTask
            let cache = connection.GetDatabase()

            let json = JsonConvert.SerializeObject(v)
                
            match register cache (KeyFor.paymentHistory v.SubscriptionId) json with
            | Error msg -> 
                do! connection.CloseAsync() |> Async.AwaitTask
                return Error msg

            | Ok () ->
                do! connection.CloseAsync() |> Async.AwaitTask
                return Ok()
        }

Appendix

The solution for this brain dump can be found on GitHub.

Specification

I prefer to declare the core business operations of a service, before implementing them. The code below exposes the core operations:

namespace BeachMobile.BillableSubscription

open System.Threading.Tasks
open Language

module Operations =

    type RequestRegistration   = RegistrationRequest -> Task<Result<RegistrationStatus        ,ErrorDescription>>
    type GetRegistrationStatus = RegistrationReceipt -> Task<Result<Option<RegistrationStatus>,ErrorDescription>>

    type SubmitPayment     = PaymentRequest -> Task<Result<SuccessfulPayment             ,ErrorDescription>>
    type GetPaymentHistory = SubscriptionId -> Task<Result<option<seq<SuccessfulPayment>>,ErrorDescription>>

CosmosDB Container Configuration

namespace BeachMobile.BillableSubscription.DataGateway.Cosmos

open Microsoft.Azure.Cosmos
open Azure.Identity

type ConnectionString() = static member val Instance = "" with get,set

module Container =

    let registration       = "Registration"
    let registrationStatus = "RegistrationStatus"
    let payments           = "Payments"
    let paymentHistory     = "PaymentHistory"

module Database =

    let name = "beachmobile-db"

    module Container =

        let get (db:string) (containerId:string) =

            let client    = new CosmosClient(ConnectionString.Instance, DefaultAzureCredential())
            let database  = client.GetDatabase(db)
            let container = database.GetContainer(containerId)
            container

Redis Cache Configuration

namespace BeachMobile.BillableSubscription.DataGateway.Redis

module KeyFor =
    
    let payment(subscriptionId)            = $"Payment:{subscriptionId}"
    let registrationStatus(tenantId,plan)  = $"{tenantId}:{plan}"
    let paymentHistory(subscriptionId)     = $"PaymentHistory:{subscriptionId}"

Leave a comment