F#: Akka.Net (Fault Tolerance)

Intro

This post attempts to document my understanding of Akka.Net’s fault-tolerance support.

Fault-tolerance appears to work out-of-the-box via the SupervisorStrategy method of a parent actor.

SupervisorStrategy

The SupervisorStrategy method appears to be a virtual method that allows default behavior to be overridden.

The following method executes the strategy for managing faults within a program’s execution:

        override this.SupervisorStrategy() =

            let NumberOfRetries = Nullable<int> 3
            let timeSpan = Nullable<TimeSpan> (TimeSpan.FromSeconds(5.0))
            let f = fun (ex:Exception) -> Directive.Stop

            OneForOneStrategy(NumberOfRetries, timeSpan, f) :> SupervisorStrategy

 

The method above appears to prevent an actor from restarting via the explicit Stop directive. By default, the supervisor strategy will use the Restart directive. Thus, fault-tolerance was observed by default. Hence, commenting-out the method above or replacing Directive.Stop with Directive.Restart appears to enable fault-tolerence.

Therefore, one could comment out the following like this:

//override this.SupervisorStrategy() =

    //let NumberOfRetries = Nullable<int> 3
    //let timeSpan = Nullable<TimeSpan> (TimeSpan.FromSeconds(5.0))
    //let f = fun (ex:Exception) -> Directive.Restart

    //OneForOneStrategy(NumberOfRetries, timeSpan, f) :> SupervisorStrategy

 

And still achieve default behavior of restarting an actor like this:

override this.SupervisorStrategy() =

    let NumberOfRetries = Nullable<int> 3
    let timeSpan = Nullable<TimeSpan> (TimeSpan.FromSeconds(5.0))
    let f = fun (ex:Exception) -> Directive.Restart

    OneForOneStrategy(NumberOfRetries, timeSpan, f) :> SupervisorStrategy

SimpleClusterListener:

SimpleClusterListener parents StorageActor.

namespace Samples.Cluster.Simple

open System
open Akka.Actor
open Akka.Cluster
open Akka.Event

type SimpleClusterListener() =
    inherit UntypedActor()

        override this.PreStart() =
            let cluster = Cluster.Get(UntypedActor.Context.System)
            let (events:System.Type array) = [| typeof<ClusterEvent.IMemberEvent>
                                                typeof<ClusterEvent.UnreachableMember> |]
            cluster.Subscribe(base.Self, ClusterEvent.InitialStateAsEvents, events)

            let storage = UntypedActor.Context.ActorOf<Storage>("storage")
            storage.Tell(Message "Hello from SimpleClusterListener")


        override this.OnReceive(message:obj) =

            let log = UntypedActor.Context.GetLogger()

            match message with
            | :? ClusterEvent.MemberUp          as e -> log.Info("Member is up: {0}",                   e.Member)
            | :? ClusterEvent.MemberRemoved     as e -> log.Info("Member is removed: {0}",              e.Member)
            | :? ClusterEvent.UnreachableMember as e -> log.Info("Member detected as unreachable: {0}", e.Member)                     
            | :? string                              -> log.Info(string message)
                                                        this.Sender.Tell(Message "Hello again from SimpleClusterListener")
            | _ -> ()

        override this.PostStop() = 
            let cluster = Akka.Cluster.Cluster.Get(UntypedActor.Context.System)
            cluster.Unsubscribe base.Self

        override this.SupervisorStrategy() =

            let NumberOfRetries = Nullable<int> 3
            let timeSpan = Nullable<TimeSpan> (TimeSpan.FromSeconds(5.0))
            let f = fun (ex:Exception) -> Directive.Restart

            OneForOneStrategy(NumberOfRetries, timeSpan, f) :> SupervisorStrategy

Storage Actor

The Storage actor has the ability to throw an exception for its parent (SimpleClusterListener) to manage via its SupervisorStrategy.

namespace Samples.Cluster.Simple

open Akka.Actor
open Akka.Event

type Message = Message of string

type Storage() =

    inherit UntypedActor()

    [<DefaultValue>] static val mutable private counter : int

    override this.OnReceive(message:obj) =
        let log = UntypedActor.Context.GetLogger()

        match message with
        | :? Message as v -> let (Message msg) = v
                             let response = sprintf "Storage Received: %s" msg
                             log.Info(response)
                             this.Sender.Tell(response)

                             Storage.counter <- Storage.counter + 1
                             if Storage.counter = 2
                             then failwith "Testing SupervisorStrategy"
        | _ -> ()

Appendix

The following represents aspects of the solution that I was working on.

Main

module Program

open System
open System.Configuration
open Akka.Configuration.Hocon
open Akka.Configuration
open Akka.Actor
open Samples.Cluster.Simple

[<Literal>]
let ExitWithSuccess = 0

let createActor port =
    let section = ConfigurationManager.GetSection "akka" :?> AkkaConfigurationSection
    let config =  ConfigurationFactory.ParseString(sprintf "akka.remote.helios.tcp.port=%s" port)
                                      .WithFallback(section.AkkaConfig)

    let system = ActorSystem.Create ("ClusterSystem", config)
    let actorRef = Props.Create(typeof<SimpleClusterListener>)
    system.ActorOf(actorRef, "clusterListener") |> ignore

let startUp (ports:string list) = ports |> List.iter createActor

[<EntryPoint>]
let main args =
    startUp ["2551" (*; "2552"; "0"*)]
    Console.WriteLine("Press any key to exit")
    Console.ReadLine() |> ignore
    ExitWithSuccess

App.config

<?xml version="1.0" encoding="utf-8"?>
<configuration>
  <configSections>
    <section name="akka" type="Akka.Configuration.Hocon.AkkaConfigurationSection, Akka"/>
  </configSections>
  <startup>
    <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.2"/>
  </startup>
  <akka>
    <hocon>
      <![CDATA[
          akka {
            actor {
              provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
            }
            
            remote {
              log-remote-lifecycle-events = DEBUG
              helios.tcp {
                hostname = "localhost"
                port = 0
              }
            }

            cluster {
              seed-nodes = [
                "akka.tcp://ClusterSystem@localhost:2551",
                "akka.tcp://ClusterSystem@localhost:2552"]

              #auto-down-unreachable-after = 30s
            }
          }
      ]]>
    </hocon>
  </akka>
</configuration>
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: