Intro

This document is meant to document my understanding of load balancing remote-deployed actors using the documented OOP support for the Akka.Net framework.

Setup

My solution had the following projects:

  • System1
  • System2
  • Shared

System1

The System1 project is an executable that remote-deploys actors to a separate process.

The following reflects how remote actors get deployed onto a separate process:


use system = ActorSystem.Create("system1", config)

let reply = system.ActorOf<ReplyActor>("reply")

let props1 = Props.Create(typeof<SomeActor>, [||])

let props2 = Props.Create(typeof<SomeActor>, [||])

let props3 = Props.Create(typeof<SomeActor>, [||])

let remote1 = system.ActorOf(props1.WithRouter(FromConfig.Instance), "remoteactor1")

let remote2 = system.ActorOf(props2.WithRouter(FromConfig.Instance), "remoteactor2")

let remote3 = system.ActorOf(props3.WithRouter(FromConfig.Instance), "remoteactor3")

In order for this code to execute successfully, the following configuration is required:

        actor {
            provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote""

            debug {
              receive = on
              autoreceive = on
              lifecycle = on
              event-stream = on
              unhandled = on
            }

            deployment {
                /localactor {
                    router = consistent-hashing-pool
                    nr-of-instances = 5
                    virtual-nodes-factor = 10
                }
                /remoteactor1 {
                    router = consistent-hashing-pool
                    nr-of-instances = 5
                    remote = ""akka.tcp://system2@localhost:8080""
                }
                /remoteactor2 {
                    router = consistent-hashing-pool
                    nr-of-instances = 5
                    remote = ""akka.tcp://system2@localhost:8080""
                }
                /remoteactor3 {
                    router = consistent-hashing-pool
                    nr-of-instances = 5
                    remote = ""akka.tcp://system2@localhost:8080""
                }
            }

After the actors get remote-deployed to a separate process, they can then receive messages. Akka.Net supports load balancing for distributing messages amongst actors. To add load balancing support for the actors, Akka.Net provides router types. This example uses a group-router type called ConsistentHashingGroup. The ConsistentHashingGroup router relies on a hash key to route messages to corresponding actors. The configuration for an actor needs to provide this information as it does in the configuration example above.

Once the configuration is specified like in the example above, a ConsistentHashingGroup instance can be created and configured (specifying the actors):

let hashGroup = system.ActorOf(Props.Empty.WithRouter(ConsistentHashingGroup(config)))
Task.Delay(500).Wait();

let routee1 = Routee.FromActorRef(remote1);
hashGroup.Tell(new AddRoutee(routee1));

            let routee2 = Routee.FromActorRef(remote2);
hashGroup.Tell(new AddRoutee(routee2));

let routee3 = Routee.FromActorRef(remote3);
hashGroup.Tell(new AddRoutee(routee3));

Once we have a message type defined that implements IConsistentHashable, we can send messages:

Task.Delay(500).Wait();

for i = 0 to 5 do
    for j = 0 to 7 do

        let message = new HashMessage(j, sprintf "remote message: %i" j);
                    hashGroup.Tell(message, reply);

System2

System2 represents the process that we want to ultimately deploy our actors to even though we’re on a different process and/or machine. The reason for this is to provide support fault-tolerance and load balancing.

The following code was written for the System2 executable:

open Akka.Configuration
open Akka.Actor
open System

[<Literal>]
let EndWithSuccess = 0

[<EntryPoint>]
let main argv = 

    let config = ConfigurationFactory.ParseString(@"
        akka {
            log-config-on-start = on
            stdout-loglevel = DEBUG
            loglevel = DEBUG
            actor {
                provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote""

                debug {
                  receive = on
                  autoreceive = on
                  lifecycle = on
                  event-stream = on
                  unhandled = on
                }
            }
            remote {
                helios.tcp {
		            port = 8080
		            hostname = localhost
                }
            }
        }
        ")

    use system = ActorSystem.Create("system2", config)
    Console.ReadLine() |> ignore

    EndWithSuccess

Shared

Both System1 and System2 reference the same actor type and message type. As a result, in order to remote deploy an actor and have that actor receive messages on a separate process, then we need to ensure that the separate process knows how to construct an instance of that actor as well as the message types that it subscribes to. To do this, we need to provide a shared assembly that harbors the actor type and message type.

The following actor type was added to a shared project for both executables (i.e. processes to reference):

type SomeActor() =

    inherit UntypedActor()

        override this.OnReceive(msg:obj) =

            match msg with
            | 😕 HashMessage as m ->
                let address = this.Self.Path.ToStringWithAddress()
                let content = (sprintf "%s got %s" address m.Content)
                Console.WriteLine(content);
            | _ -> ()

            let sender = UntypedActor.Context.Sender
            let content = (sprintf "Message from %A - %s" sender.Path, msg)
            System.Console.WriteLine(content);

The following message type was added to a shared project for both executables (i.e. processes to reference):

open Akka.Routing
open System

type HashMessage(id:int, content:string) =

    interface IConsistentHashable with

        member this.ConsistentHashKey:obj = box id

    member val Content  = content

Output

The output below reflects the load-balancing of actors based on the message key:

akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 0
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 1
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor1/$e got remote message: 3
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 4
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 5
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor1/$e got remote message: 3
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor1/$e got remote message: 3
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor1/$e got remote message: 3
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor1/$e got remote message: 3
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor1/$e got remote message: 3
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 0
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 4
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 5
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 0
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 4
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 5
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 0
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 4
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 5
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 0
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 4
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 5
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 0
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 4
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 5
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 2
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 6
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 7
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 1
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 2
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 6
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 7
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 1
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 2
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 6
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 7
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 1
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 2
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 6
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 7
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 1)
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 2
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 6
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 7
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 1
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 2
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 6
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 7

Conclusion

In conclusion, I attempted to document my understanding of load balancing remote-deployed actors using the documented OOP support for the Akka.Net framework. Successful deployment of actors onto a separate process and/or machine requires configuration, a router, and a shared reference to the actor and message type. Once these are established, messages can be routed and processed.

Intro

This post attempts to document my understanding of Akka.Net’s fault-tolerance support.

Fault-tolerance appears to work out-of-the-box via the SupervisorStrategy method of a parent actor.

SupervisorStrategy

The SupervisorStrategy method appears to be a virtual method that allows default behavior to be overridden.

The following method executes the strategy for managing faults within a program’s execution:

        override this.SupervisorStrategy() =

            let NumberOfRetries = Nullable<int> 3
            let timeSpan = Nullable<TimeSpan> (TimeSpan.FromSeconds(5.0))
            let f = fun (ex:Exception) -> Directive.Stop

            OneForOneStrategy(NumberOfRetries, timeSpan, f) :> SupervisorStrategy

 

The method above appears to prevent an actor from restarting via the explicit Stop directive. By default, the supervisor strategy will use the Restart directive. Thus, fault-tolerance was observed by default. Hence, commenting-out the method above or replacing Directive.Stop with Directive.Restart appears to enable fault-tolerence.

Therefore, one could comment out the following like this:

//override this.SupervisorStrategy() =

    //let NumberOfRetries = Nullable<int> 3
    //let timeSpan = Nullable<TimeSpan> (TimeSpan.FromSeconds(5.0))
    //let f = fun (ex:Exception) -> Directive.Restart

    //OneForOneStrategy(NumberOfRetries, timeSpan, f) :> SupervisorStrategy

 

And still achieve default behavior of restarting an actor like this:

override this.SupervisorStrategy() =

    let NumberOfRetries = Nullable<int> 3
    let timeSpan = Nullable<TimeSpan> (TimeSpan.FromSeconds(5.0))
    let f = fun (ex:Exception) -> Directive.Restart

    OneForOneStrategy(NumberOfRetries, timeSpan, f) :> SupervisorStrategy

SimpleClusterListener:

SimpleClusterListener parents StorageActor.

namespace Samples.Cluster.Simple

open System
open Akka.Actor
open Akka.Cluster
open Akka.Event

type SimpleClusterListener() =
    inherit UntypedActor()

        override this.PreStart() =
            let cluster = Cluster.Get(UntypedActor.Context.System)
            let (events:System.Type array) = [| typeof<ClusterEvent.IMemberEvent>
                                                typeof<ClusterEvent.UnreachableMember> |]
            cluster.Subscribe(base.Self, ClusterEvent.InitialStateAsEvents, events)

            let storage = UntypedActor.Context.ActorOf<Storage>("storage")
            storage.Tell(Message "Hello from SimpleClusterListener")


        override this.OnReceive(message:obj) =

            let log = UntypedActor.Context.GetLogger()

            match message with
            | :? ClusterEvent.MemberUp          as e -> log.Info("Member is up: {0}",                   e.Member)
            | :? ClusterEvent.MemberRemoved     as e -> log.Info("Member is removed: {0}",              e.Member)
            | :? ClusterEvent.UnreachableMember as e -> log.Info("Member detected as unreachable: {0}", e.Member)                     
            | :? string                              -> log.Info(string message)
                                                        this.Sender.Tell(Message "Hello again from SimpleClusterListener")
            | _ -> ()

        override this.PostStop() = 
            let cluster = Akka.Cluster.Cluster.Get(UntypedActor.Context.System)
            cluster.Unsubscribe base.Self

        override this.SupervisorStrategy() =

            let NumberOfRetries = Nullable<int> 3
            let timeSpan = Nullable<TimeSpan> (TimeSpan.FromSeconds(5.0))
            let f = fun (ex:Exception) -> Directive.Restart

            OneForOneStrategy(NumberOfRetries, timeSpan, f) :> SupervisorStrategy

Storage Actor

The Storage actor has the ability to throw an exception for its parent (SimpleClusterListener) to manage via its SupervisorStrategy.

namespace Samples.Cluster.Simple

open Akka.Actor
open Akka.Event

type Message = Message of string

type Storage() =

    inherit UntypedActor()

    [<DefaultValue>] static val mutable private counter : int

    override this.OnReceive(message:obj) =
        let log = UntypedActor.Context.GetLogger()

        match message with
        | :? Message as v -> let (Message msg) = v
                             let response = sprintf "Storage Received: %s" msg
                             log.Info(response)
                             this.Sender.Tell(response)

                             Storage.counter <- Storage.counter + 1
                             if Storage.counter = 2
                             then failwith "Testing SupervisorStrategy"
        | _ -> ()

Appendix

The following represents aspects of the solution that I was working on.

Main

module Program

open System
open System.Configuration
open Akka.Configuration.Hocon
open Akka.Configuration
open Akka.Actor
open Samples.Cluster.Simple

[<Literal>]
let ExitWithSuccess = 0

let createActor port =
    let section = ConfigurationManager.GetSection "akka" :?> AkkaConfigurationSection
    let config =  ConfigurationFactory.ParseString(sprintf "akka.remote.helios.tcp.port=%s" port)
                                      .WithFallback(section.AkkaConfig)

    let system = ActorSystem.Create ("ClusterSystem", config)
    let actorRef = Props.Create(typeof<SimpleClusterListener>)
    system.ActorOf(actorRef, "clusterListener") |> ignore

let startUp (ports:string list) = ports |> List.iter createActor

[<EntryPoint>]
let main args =
    startUp ["2551" (*; "2552"; "0"*)]
    Console.WriteLine("Press any key to exit")
    Console.ReadLine() |> ignore
    ExitWithSuccess

App.config

<?xml version="1.0" encoding="utf-8"?>
<configuration>
  <configSections>
    <section name="akka" type="Akka.Configuration.Hocon.AkkaConfigurationSection, Akka"/>
  </configSections>
  <startup>
    <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.2"/>
  </startup>
  <akka>
    <hocon>
      <![CDATA[
          akka {
            actor {
              provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
            }
            
            remote {
              log-remote-lifecycle-events = DEBUG
              helios.tcp {
                hostname = "localhost"
                port = 0
              }
            }

            cluster {
              seed-nodes = [
                "akka.tcp://ClusterSystem@localhost:2551",
                "akka.tcp://ClusterSystem@localhost:2552"]

              #auto-down-unreachable-after = 30s
            }
          }
      ]]>
    </hocon>
  </akka>
</configuration>