Intro

This post attempts to document my understanding of Akka.Net’s fault-tolerance support.

Fault-tolerance appears to work out-of-the-box via the SupervisorStrategy method of a parent actor.

SupervisorStrategy

The SupervisorStrategy method appears to be a virtual method that allows default behavior to be overridden.

The following method executes the strategy for managing faults within a program’s execution:

        override this.SupervisorStrategy() =

            let NumberOfRetries = Nullable<int> 3
            let timeSpan = Nullable<TimeSpan> (TimeSpan.FromSeconds(5.0))
            let f = fun (ex:Exception) -> Directive.Stop

            OneForOneStrategy(NumberOfRetries, timeSpan, f) :> SupervisorStrategy

 

The method above appears to prevent an actor from restarting via the explicit Stop directive. By default, the supervisor strategy will use the Restart directive. Thus, fault-tolerance was observed by default. Hence, commenting-out the method above or replacing Directive.Stop with Directive.Restart appears to enable fault-tolerence.

Therefore, one could comment out the following like this:

//override this.SupervisorStrategy() =

    //let NumberOfRetries = Nullable<int> 3
    //let timeSpan = Nullable<TimeSpan> (TimeSpan.FromSeconds(5.0))
    //let f = fun (ex:Exception) -> Directive.Restart

    //OneForOneStrategy(NumberOfRetries, timeSpan, f) :> SupervisorStrategy

 

And still achieve default behavior of restarting an actor like this:

override this.SupervisorStrategy() =

    let NumberOfRetries = Nullable<int> 3
    let timeSpan = Nullable<TimeSpan> (TimeSpan.FromSeconds(5.0))
    let f = fun (ex:Exception) -> Directive.Restart

    OneForOneStrategy(NumberOfRetries, timeSpan, f) :> SupervisorStrategy

SimpleClusterListener:

SimpleClusterListener parents StorageActor.

namespace Samples.Cluster.Simple

open System
open Akka.Actor
open Akka.Cluster
open Akka.Event

type SimpleClusterListener() =
    inherit UntypedActor()

        override this.PreStart() =
            let cluster = Cluster.Get(UntypedActor.Context.System)
            let (events:System.Type array) = [| typeof<ClusterEvent.IMemberEvent>
                                                typeof<ClusterEvent.UnreachableMember> |]
            cluster.Subscribe(base.Self, ClusterEvent.InitialStateAsEvents, events)

            let storage = UntypedActor.Context.ActorOf<Storage>("storage")
            storage.Tell(Message "Hello from SimpleClusterListener")


        override this.OnReceive(message:obj) =

            let log = UntypedActor.Context.GetLogger()

            match message with
            | :? ClusterEvent.MemberUp          as e -> log.Info("Member is up: {0}",                   e.Member)
            | :? ClusterEvent.MemberRemoved     as e -> log.Info("Member is removed: {0}",              e.Member)
            | :? ClusterEvent.UnreachableMember as e -> log.Info("Member detected as unreachable: {0}", e.Member)                     
            | :? string                              -> log.Info(string message)
                                                        this.Sender.Tell(Message "Hello again from SimpleClusterListener")
            | _ -> ()

        override this.PostStop() = 
            let cluster = Akka.Cluster.Cluster.Get(UntypedActor.Context.System)
            cluster.Unsubscribe base.Self

        override this.SupervisorStrategy() =

            let NumberOfRetries = Nullable<int> 3
            let timeSpan = Nullable<TimeSpan> (TimeSpan.FromSeconds(5.0))
            let f = fun (ex:Exception) -> Directive.Restart

            OneForOneStrategy(NumberOfRetries, timeSpan, f) :> SupervisorStrategy

Storage Actor

The Storage actor has the ability to throw an exception for its parent (SimpleClusterListener) to manage via its SupervisorStrategy.

namespace Samples.Cluster.Simple

open Akka.Actor
open Akka.Event

type Message = Message of string

type Storage() =

    inherit UntypedActor()

    [<DefaultValue>] static val mutable private counter : int

    override this.OnReceive(message:obj) =
        let log = UntypedActor.Context.GetLogger()

        match message with
        | :? Message as v -> let (Message msg) = v
                             let response = sprintf "Storage Received: %s" msg
                             log.Info(response)
                             this.Sender.Tell(response)

                             Storage.counter <- Storage.counter + 1
                             if Storage.counter = 2
                             then failwith "Testing SupervisorStrategy"
        | _ -> ()

Appendix

The following represents aspects of the solution that I was working on.

Main

module Program

open System
open System.Configuration
open Akka.Configuration.Hocon
open Akka.Configuration
open Akka.Actor
open Samples.Cluster.Simple

[<Literal>]
let ExitWithSuccess = 0

let createActor port =
    let section = ConfigurationManager.GetSection "akka" :?> AkkaConfigurationSection
    let config =  ConfigurationFactory.ParseString(sprintf "akka.remote.helios.tcp.port=%s" port)
                                      .WithFallback(section.AkkaConfig)

    let system = ActorSystem.Create ("ClusterSystem", config)
    let actorRef = Props.Create(typeof<SimpleClusterListener>)
    system.ActorOf(actorRef, "clusterListener") |> ignore

let startUp (ports:string list) = ports |> List.iter createActor

[<EntryPoint>]
let main args =
    startUp ["2551" (*; "2552"; "0"*)]
    Console.WriteLine("Press any key to exit")
    Console.ReadLine() |> ignore
    ExitWithSuccess

App.config

<?xml version="1.0" encoding="utf-8"?>
<configuration>
  <configSections>
    <section name="akka" type="Akka.Configuration.Hocon.AkkaConfigurationSection, Akka"/>
  </configSections>
  <startup>
    <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.2"/>
  </startup>
  <akka>
    <hocon>
      <![CDATA[
          akka {
            actor {
              provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
            }
            
            remote {
              log-remote-lifecycle-events = DEBUG
              helios.tcp {
                hostname = "localhost"
                port = 0
              }
            }

            cluster {
              seed-nodes = [
                "akka.tcp://ClusterSystem@localhost:2551",
                "akka.tcp://ClusterSystem@localhost:2552"]

              #auto-down-unreachable-after = 30s
            }
          }
      ]]>
    </hocon>
  </akka>
</configuration>

Intro

This post is intended to document my current understanding of the Cluster.Sharding feature within Akka.Net. This feature is in pre-release as of 3/17/2017.

I identified from this documentation that a cluster represents a fault-tolerant, elastic, decentralized, peer-to-peer network of programs. The Akka.Net framework provides a library that supports creating and managing a cluster.

Akka.Cluster.Sharding

Akkling.Cluster.Sharding is an open-source library that provides F# support for an Akka.NET cluster. I discovered this library after posting questions on stackoverflow.

Within Akka.Net, there are four terms that are essential for understanding how to implement a cluster using Akka.NET:

  • Node
  • Shard
  • ShardRegion
  • Entity

What is a Node?

I assume a node is a service identified by the combination of a server address and port number that contain one or more shards.

What is a Shard?

A shard is a group of entities that are managed together within a clustered system.

What is a ShardRegion?

A shard region is responsible for locating the shard of an entity as well as delivering messages to an entity. In the event that an entity is not available to receive a message, the ShardRegion will spawn a new entity so that it can deliver its message.

What is an Entity?

An entity is essentially an actor that belongs to a cluster. However, in order to locate an entity, the shard region of that entity needs to be identified.

Sample Code

I took the sample code from the Akkling repo and made minor changes:

open System
open System.IO
#if INTERACTIVE
let cd = Path.Combine(__SOURCE_DIRECTORY__, "../src/Akkling.Cluster.Sharding/bin/Debug")
System.IO.Directory.SetCurrentDirectory(cd)
#endif

#r "../src/Akkling.Cluster.Sharding/bin/Debug/System.Collections.Immutable.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Hyperion.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Newtonsoft.Json.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/FSharp.PowerPack.Linq.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Helios.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/FsPickler.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Google.ProtocolBuffers.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Google.ProtocolBuffers.Serialization.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Remote.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Google.ProtocolBuffers.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Persistence.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Cluster.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Cluster.Tools.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Cluster.Sharding.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Serialization.Hyperion.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akkling.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akkling.Persistence.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akkling.Cluster.Sharding.dll"

open Akka.Actor
open Akka.Configuration
open Akka.Cluster
open Akka.Cluster.Tools.Singleton
open Akka.Cluster.Sharding
open Akka.Persistence

open Akkling
open Akkling.Persistence
open Akkling.Cluster
open Akkling.Cluster.Sharding
open Hyperion

let configWithPort port =
    let config = Configuration.parse ("""
        akka {
            actor {
              provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
              serializers {
                hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
              }
              serialization-bindings {
                "System.Object" = hyperion
              }
            }
          remote {
            helios.tcp {
              public-hostname = "localhost"
              hostname = "localhost"
              port = """ + port.ToString() + """
            }
          }
          cluster {
            auto-down-unreachable-after = 5s
            seed-nodes = [ "akka.tcp://cluster-system@localhost:2551/" ]
          }
          persistence {
            journal.plugin = "akka.persistence.journal.inmem"
            snapshot-store.plugin = "akka.persistence.snapshot-store.local"
          }
        }
        """)
    config.WithFallback(ClusterSingletonManager.DefaultConfig())
 
let consumer (actor:Actor<_>) msg = printfn "\n%A received %s" (actor.Self.Path.ToStringWithAddress()) msg |> ignored
 
// spawn two separate systems with shard regions on each of them
 
let system1 = System.create "cluster-system" (configWithPort 2551)
let shardRegion1 = spawnSharded id system1 "printer" <| props (actorOf2 consumer)
 
// wait a while before starting a second system
System.Threading.Thread.Sleep(3000)
let system2 = System.create "cluster-system" (configWithPort 2552)
let shardRegion2 = spawnSharded id system2 "printer" <| props (actorOf2 consumer)
 
// send hello world to entities on 4 different shards (this means that we will have 4 entities in total)
// NOTE: even thou we sent all messages through single shard region,
//       some of them will be executed on the second one thanks to shard balancing
System.Threading.Thread.Sleep(3000)
shardRegion1 <! ("shard-1", "entity-1", "hello world 1")
shardRegion1 <! ("shard-2", "entity-2", "hello world 2")
shardRegion1 <! ("shard-1", "entity-3", "hello world 3")
shardRegion1 <! ("shard-2", "entity-4", "hello world 4")
 
// check which shards have been build on the second shard region
 
System.Threading.Thread.Sleep(3000)
 
open Akka.Cluster.Sharding
 
let printShards shardRegion =
    async {
        let! reply = (retype shardRegion) <? GetShardRegionStats.Instance         
        let (stats: ShardRegionStats) = reply.Value         
        for kv in stats.Stats do             
            printfn "\tShard '%s' has %d entities on it" kv.Key kv.Value     
    } |> Async.RunSynchronously
 
printfn "\nShards active on node 'localhost:2551':"
printShards shardRegion1
printfn "\nShards active on node 'localhost:2552':"
printShards shardRegion2

The output is the following:

Binding session to 'C:\Users\Snimrod\Desktop\Akkling-master\examples\../src/Akkling.Cluster.Sharding/bin/Debug/Akka.dll'...
Binding session to 'C:\Users\Snimrod\Desktop\Akkling-master\examples\../src/Akkling.Cluster.Sharding/bin/Debug/System.Collections.Immutable.dll'...
[INFO][3/15/2017 2:58:16 PM][Thread 0001][remoting] Starting remoting
[INFO][3/15/2017 2:58:16 PM][Thread 0001][remoting] Remoting started; listening on addresses : [akka.tcp://cluster-system@localhost:2551]
[INFO][3/15/2017 2:58:16 PM][Thread 0001][remoting] Remoting now listens on addresses: [akka.tcp://cluster-system@localhost:2551]
[INFO][3/15/2017 2:58:16 PM][Thread 0001][Cluster] Cluster Node [akka.tcp://cluster-system@localhost:2551] - Starting up...
[INFO][3/15/2017 2:58:16 PM][Thread 0001][Cluster] Cluster Node [akka.tcp://cluster-system@localhost:2551] - Started up successfully
[INFO][3/15/2017 2:58:16 PM][Thread 0007][[akka://cluster-system/system/cluster/core/daemon#1110691120]] Node [akka.tcp://cluster-system@localhost:2551] is JOINING, roles []
[INFO][3/15/2017 2:58:16 PM][Thread 0007][[akka://cluster-system/system/cluster/core/daemon#1110691120]] Leader is moving node [akka.tcp://cluster-system@localhost:2551] to [Up]
[INFO][3/15/2017 2:58:16 PM][Thread 0007][akka://cluster-system/user/sharding/printerCoordinator/singleton/coordinator] Message Register from akka://cluster-system/user/sharding/printer to akka://cluster-system/user/sharding/printerCoordinator/singleton/coordinator was not delivered. 1 dead letters encountered.
[INFO][3/15/2017 2:58:16 PM][Thread 0004][[akka://cluster-system/user/sharding/printerCoordinator#643222226]] Singleton manager [akka.tcp://cluster-system@localhost:2551] starting singleton actor
[INFO][3/15/2017 2:58:16 PM][Thread 0004][[akka://cluster-system/user/sharding/printerCoordinator#643222226]] ClusterSingletonManager state change [Start -> Oldest] Akka.Cluster.Tools.Singleton.Uninitialized
[INFO][3/15/2017 2:58:19 PM][Thread 0001][remoting] Starting remoting
[INFO][3/15/2017 2:58:19 PM][Thread 0001][remoting] Remoting started; listening on addresses : [akka.tcp://cluster-system@localhost:2552]
[INFO][3/15/2017 2:58:19 PM][Thread 0001][remoting] Remoting now listens on addresses: [akka.tcp://cluster-system@localhost:2552]
[INFO][3/15/2017 2:58:19 PM][Thread 0001][Cluster] Cluster Node [akka.tcp://cluster-system@localhost:2552] - Starting up...
[INFO][3/15/2017 2:58:19 PM][Thread 0001][Cluster] Cluster Node [akka.tcp://cluster-system@localhost:2552] - Started up successfully
[INFO][3/15/2017 2:58:19 PM][Thread 0008][[akka://cluster-system/system/cluster/core/daemon#1110691120]] Node [akka.tcp://cluster-system@localhost:2552] is JOINING, roles []
[INFO][3/15/2017 2:58:20 PM][Thread 0004][[akka://cluster-system/system/cluster/core/daemon#205575170]] Welcome from [akka.tcp://cluster-system@localhost:2551]
[INFO][3/15/2017 2:58:20 PM][Thread 0006][[akka://cluster-system/system/cluster/core/daemon#1110691120]] Leader is moving node [akka.tcp://cluster-system@localhost:2552] to [Up]
[INFO][3/15/2017 2:58:20 PM][Thread 0008][[akka://cluster-system/user/sharding/printerCoordinator#1529090680]] ClusterSingletonManager state change [Start -> Younger] Akka.Cluster.Tools.Singleton.Uninitialized

"akka://cluster-system/user/sharding/printer/shard-1/entity-1" received hello world 1

"akka://cluster-system/user/sharding/printer/shard-2/entity-2" received hello world 2

"akka://cluster-system/user/sharding/printer/shard-1/entity-3" received hello world 3

"akka://cluster-system/user/sharding/printer/shard-2/entity-4" received hello world 4

Shards active on node 'localhost:2551':
	Shard 'shard-2' has 2 entities on it

Shards active on node 'localhost:2552':
	Shard 'shard-1' has 2 entities on it

The code that does messaging was the following:

shardRegion1 <! ("shard-1", "entity-1", "hello world 1")
shardRegion1 <! ("shard-2", "entity-2", "hello world 2")
shardRegion1 <! ("shard-1", "entity-3", "hello world 3")
shardRegion1 <! ("shard-2", "entity-4", "hello world 4")

Note how the lines above are all addressing the same shard (i.e. shardRegion1). However, Shard Balancing will attempt to dispatch messages such that they are evenly distributed across shards.

In addition, if we append shard-3 below:

System.Threading.Thread.Sleep(3000)
shardRegion1 <! ("shard-1", "entity-1", "hello world 1")
shardRegion1 <! ("shard-2", "entity-2", "hello world 2")
shardRegion1 <! ("shard-1", "entity-3", "hello world 3")
shardRegion1 <! ("shard-2", "entity-4", "hello world 4")

shardRegion1 <! ("shard-3", "entity-5", "hello world 4")

Then we can observe an update within our nodes on the cluster:

"akka://cluster-system/user/sharding/printer/shard-1/entity-3" received hello world 3

"akka://cluster-system/user/sharding/printer/shard-3/entity-5" received hello world 4

"akka://cluster-system/user/sharding/printer/shard-1/entity-1" received hello world 1

"akka://cluster-system/user/sharding/printer/shard-2/entity-2" received hello world 2

"akka://cluster-system/user/sharding/printer/shard-2/entity-4" received hello world 4

Shards active on node 'localhost:2551':
	Shard 'shard-3' has 1 entities on it
	Shard 'shard-1' has 2 entities on it

Shards active on node 'localhost:2552':
	Shard 'shard-2' has 2 entities on it

Conclusion

The End.

Today I learned how to implement a method that accepts a variable number of arguments of the same type.

In C#, we use “params” keyword.

In F#, we use the ParamArray attribute.

open System

type MyClass() =

    member this.SomeMethod([<ParamArray>](args:(string*string) array)) = ()
    

let someInstance = MyClass()

someInstance.SomeMethod( ("Table1","Column1"),
                         ("Table2","Column2"),
                         ("Table3","Column3"),
                         ("Table4","Column4") )

The example above demonstrates how an variable number of typed arguments can be passed to a method. The End.

Intro

As I document my journey of learning F#, I have identified alternative methods for retrieving a value from a single case Discriminated Union.

Let’s examine the following code:

module SomeModule

type Email = Email of string

let email = Email "scott@abc.com"

With the code above, how can we retrieve the actual email address?
I have identified various approaches (i.e. syntax) to accomplish this task.

Method #1:

let address1 = match email with
               | Email v -> v

The code above is what I’m use to seeing as an example. However, it feels a bit overkill.

Method #2:

let address2 = email |> function
               | Email v -> v

The code above is less verbose than method #1. However, it still requires two lines of code.

Method #3:

let address3 = email |> function Email v -> v

The code above has been consolidated to just one line. However, that one line still feels a bit cryptic.

Method #4:

let (Email address4) = email

The code above is simple and somewhat concise. But it can be even shorter…

Method #5:

let Email address4 = email

The code above appears to be the most concise.

Conclusion

In conclusion, as I document my journey of learning F#, I have identified alternative methods for retrieving a value from a single case Discriminated Union. The End.

As I document my journey of learning F#, I have learned that data that is wrapped by a Unit of Measure can still interact with 3rd party libraries that do not support the Unit of Measure (UOM) feature. I learned of this technique via the following link.

Example:

open System

[<Measure>] type ms

let absoluteValue1 = Math.Abs (100<ms>)     // Won't compile
let absoluteValue2 = Math.Abs (int 100<ms>)

In conclusion, I have learned to simply cast the value wrapped by a UOM to the data type that’s required by a 3rd-party library.

My understanding is that the “use” statement guarantees that the Dispose method of IDisposable will get invoked before exiting the function.

Disposable Example:

open System

type SomeDisposable() =

    interface IDisposable with
        member this.Dispose() = ()

    member this.Divide() = 
        try 0/0
        with ex -> raise ex

Test:
I have observed that the Dispose method gets triggered after an exception is thrown.

module xxx =

    open NUnit.Framework
    open FsUnit

    [<Test>]
    let ``Some Test``() =
        use something = new SomeDisposable()
        something.Divide() |> ignore