Archive

Akka.NET

Intro

This document is meant to document my understanding of load balancing remote-deployed actors using the documented OOP support for the Akka.Net framework.

Setup

My solution had the following projects:

  • System1
  • System2
  • Shared

System1

The System1 project is an executable that remote-deploys actors to a separate process.

The following reflects how remote actors get deployed onto a separate process:


use system = ActorSystem.Create("system1", config)

let reply = system.ActorOf<ReplyActor>("reply")

let props1 = Props.Create(typeof<SomeActor>, [||])

let props2 = Props.Create(typeof<SomeActor>, [||])

let props3 = Props.Create(typeof<SomeActor>, [||])

let remote1 = system.ActorOf(props1.WithRouter(FromConfig.Instance), "remoteactor1")

let remote2 = system.ActorOf(props2.WithRouter(FromConfig.Instance), "remoteactor2")

let remote3 = system.ActorOf(props3.WithRouter(FromConfig.Instance), "remoteactor3")

In order for this code to execute successfully, the following configuration is required:

        actor {
            provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote""

            debug {
              receive = on
              autoreceive = on
              lifecycle = on
              event-stream = on
              unhandled = on
            }

            deployment {
                /localactor {
                    router = consistent-hashing-pool
                    nr-of-instances = 5
                    virtual-nodes-factor = 10
                }
                /remoteactor1 {
                    router = consistent-hashing-pool
                    nr-of-instances = 5
                    remote = ""akka.tcp://system2@localhost:8080""
                }
                /remoteactor2 {
                    router = consistent-hashing-pool
                    nr-of-instances = 5
                    remote = ""akka.tcp://system2@localhost:8080""
                }
                /remoteactor3 {
                    router = consistent-hashing-pool
                    nr-of-instances = 5
                    remote = ""akka.tcp://system2@localhost:8080""
                }
            }

After the actors get remote-deployed to a separate process, they can then receive messages. Akka.Net supports load balancing for distributing messages amongst actors. To add load balancing support for the actors, Akka.Net provides router types. This example uses a group-router type called ConsistentHashingGroup. The ConsistentHashingGroup router relies on a hash key to route messages to corresponding actors. The configuration for an actor needs to provide this information as it does in the configuration example above.

Once the configuration is specified like in the example above, a ConsistentHashingGroup instance can be created and configured (specifying the actors):

let hashGroup = system.ActorOf(Props.Empty.WithRouter(ConsistentHashingGroup(config)))
Task.Delay(500).Wait();

let routee1 = Routee.FromActorRef(remote1);
hashGroup.Tell(new AddRoutee(routee1));

            let routee2 = Routee.FromActorRef(remote2);
hashGroup.Tell(new AddRoutee(routee2));

let routee3 = Routee.FromActorRef(remote3);
hashGroup.Tell(new AddRoutee(routee3));

Once we have a message type defined that implements IConsistentHashable, we can send messages:

Task.Delay(500).Wait();

for i = 0 to 5 do
    for j = 0 to 7 do

        let message = new HashMessage(j, sprintf "remote message: %i" j);
                    hashGroup.Tell(message, reply);

System2

System2 represents the process that we want to ultimately deploy our actors to even though weā€™re on a different process and/or machine. The reason for this is to provide support fault-tolerance and load balancing.

The following code was written for the System2 executable:

open Akka.Configuration
open Akka.Actor
open System

[<Literal>]
let EndWithSuccess = 0

[<EntryPoint>]
let main argv = 

    let config = ConfigurationFactory.ParseString(@"
        akka {
            log-config-on-start = on
            stdout-loglevel = DEBUG
            loglevel = DEBUG
            actor {
                provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote""

                debug {
                  receive = on
                  autoreceive = on
                  lifecycle = on
                  event-stream = on
                  unhandled = on
                }
            }
            remote {
                helios.tcp {
		            port = 8080
		            hostname = localhost
                }
            }
        }
        ")

    use system = ActorSystem.Create("system2", config)
    Console.ReadLine() |> ignore

    EndWithSuccess

Shared

Both System1 and System2 reference the same actor type and message type. As a result, in order to remote deploy an actor and have that actor receive messages on a separate process, then we need to ensure that the separate process knows how to construct an instance of that actor as well as the message types that it subscribes to. To do this, we need to provide a shared assembly that harbors the actor type and message type.

The following actor type was added to a shared project for both executables (i.e. processes to reference):

type SomeActor() =

    inherit UntypedActor()

        override this.OnReceive(msg:obj) =

            match msg with
            | šŸ˜• HashMessage as m ->
                let address = this.Self.Path.ToStringWithAddress()
                let content = (sprintf "%s got %s" address m.Content)
                Console.WriteLine(content);
            | _ -> ()

            let sender = UntypedActor.Context.Sender
            let content = (sprintf "Message from %A - %s" sender.Path, msg)
            System.Console.WriteLine(content);

The following message type was added to a shared project for both executables (i.e. processes to reference):

open Akka.Routing
open System

type HashMessage(id:int, content:string) =

    interface IConsistentHashable with

        member this.ConsistentHashKey:obj = box id

    member val Content  = content

Output

The output below reflects the load-balancing of actors based on the message key:

akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 0
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 1
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor1/$e got remote message: 3
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 4
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 5
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor1/$e got remote message: 3
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor1/$e got remote message: 3
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor1/$e got remote message: 3
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor1/$e got remote message: 3
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor1/$e got remote message: 3
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 0
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 4
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 5
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 0
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 4
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 5
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 0
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 4
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 5
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 0
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 4
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 5
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 0
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 4
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor3/$d got remote message: 5
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 2
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 6
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 7
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 1
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 2
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 6
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 7
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 1
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 2
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 6
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 7
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 1
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 2
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 6
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 7
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 1)
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 2
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 6
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 7
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 1
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 2
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 6
akka://system2/remote/akka.tcp/system1@localhost:8090/user/remoteactor2/$b got remote message: 7

Conclusion

In conclusion, I attempted to document my understanding of load balancing remote-deployed actors using the documented OOP support for the Akka.Net framework. Successful deployment of actors onto a separate process and/or machine requires configuration, a router, and a shared reference to the actor and message type. Once these are established, messages can be routed and processed.

Intro

This post attempts to document my understanding of Akka.Net’s fault-tolerance support.

Fault-tolerance appears to work out-of-the-box via the SupervisorStrategy method of a parent actor.

SupervisorStrategy

The SupervisorStrategy method appears to be a virtual method that allows default behavior to be overridden.

The following method executes the strategy for managing faults within a programā€™s execution:

        override this.SupervisorStrategy() =

            let NumberOfRetries = Nullable<int> 3
            let timeSpan = Nullable<TimeSpan> (TimeSpan.FromSeconds(5.0))
            let f = fun (ex:Exception) -> Directive.Stop

            OneForOneStrategy(NumberOfRetries, timeSpan, f) :> SupervisorStrategy

Ā 

The method above appears to prevent an actor from restarting via the explicit Stop directive. By default, the supervisor strategy will use the Restart directive. Thus, fault-tolerance was observed by default. Hence, commenting-out the method above or replacing Directive.Stop with Directive.Restart appears to enable fault-tolerence.

Therefore, one could comment out the following like this:

//override this.SupervisorStrategy() =

    //let NumberOfRetries = Nullable<int> 3
    //let timeSpan = Nullable<TimeSpan> (TimeSpan.FromSeconds(5.0))
    //let f = fun (ex:Exception) -> Directive.Restart

    //OneForOneStrategy(NumberOfRetries, timeSpan, f) :> SupervisorStrategy

Ā 

And still achieve default behavior of restarting an actor like this:

override this.SupervisorStrategy() =

    let NumberOfRetries = Nullable<int> 3
    let timeSpan = Nullable<TimeSpan> (TimeSpan.FromSeconds(5.0))
    let f = fun (ex:Exception) -> Directive.Restart

    OneForOneStrategy(NumberOfRetries, timeSpan, f) :> SupervisorStrategy

SimpleClusterListener:

SimpleClusterListener parents StorageActor.

namespace Samples.Cluster.Simple

open System
open Akka.Actor
open Akka.Cluster
open Akka.Event

type SimpleClusterListener() =
    inherit UntypedActor()

        override this.PreStart() =
            let cluster = Cluster.Get(UntypedActor.Context.System)
            let (events:System.Type array) = [| typeof<ClusterEvent.IMemberEvent>
                                                typeof<ClusterEvent.UnreachableMember> |]
            cluster.Subscribe(base.Self, ClusterEvent.InitialStateAsEvents, events)

            let storage = UntypedActor.Context.ActorOf<Storage>("storage")
            storage.Tell(Message "Hello from SimpleClusterListener")


        override this.OnReceive(message:obj) =

            let log = UntypedActor.Context.GetLogger()

            match message with
            | :? ClusterEvent.MemberUp          as e -> log.Info("Member is up: {0}",                   e.Member)
            | :? ClusterEvent.MemberRemoved     as e -> log.Info("Member is removed: {0}",              e.Member)
            | :? ClusterEvent.UnreachableMember as e -> log.Info("Member detected as unreachable: {0}", e.Member)                     
            | :? string                              -> log.Info(string message)
                                                        this.Sender.Tell(Message "Hello again from SimpleClusterListener")
            | _ -> ()

        override this.PostStop() = 
            let cluster = Akka.Cluster.Cluster.Get(UntypedActor.Context.System)
            cluster.Unsubscribe base.Self

        override this.SupervisorStrategy() =

            let NumberOfRetries = Nullable<int> 3
            let timeSpan = Nullable<TimeSpan> (TimeSpan.FromSeconds(5.0))
            let f = fun (ex:Exception) -> Directive.Restart

            OneForOneStrategy(NumberOfRetries, timeSpan, f) :> SupervisorStrategy

Storage Actor

The Storage actor has the ability to throw an exception for its parent (SimpleClusterListener) to manage via its SupervisorStrategy.

namespace Samples.Cluster.Simple

open Akka.Actor
open Akka.Event

type Message = Message of string

type Storage() =

    inherit UntypedActor()

    [<DefaultValue>] static val mutable private counter : int

    override this.OnReceive(message:obj) =
        let log = UntypedActor.Context.GetLogger()

        match message with
        | :? Message as v -> let (Message msg) = v
                             let response = sprintf "Storage Received: %s" msg
                             log.Info(response)
                             this.Sender.Tell(response)

                             Storage.counter <- Storage.counter + 1
                             if Storage.counter = 2
                             then failwith "Testing SupervisorStrategy"
        | _ -> ()

Appendix

The following represents aspects of the solution that I was working on.

Main

module Program

open System
open System.Configuration
open Akka.Configuration.Hocon
open Akka.Configuration
open Akka.Actor
open Samples.Cluster.Simple

[<Literal>]
let ExitWithSuccess = 0

let createActor port =
    let section = ConfigurationManager.GetSection "akka" :?> AkkaConfigurationSection
    let config =  ConfigurationFactory.ParseString(sprintf "akka.remote.helios.tcp.port=%s" port)
                                      .WithFallback(section.AkkaConfig)

    let system = ActorSystem.Create ("ClusterSystem", config)
    let actorRef = Props.Create(typeof<SimpleClusterListener>)
    system.ActorOf(actorRef, "clusterListener") |> ignore

let startUp (ports:string list) = ports |> List.iter createActor

[<EntryPoint>]
let main args =
    startUp ["2551" (*; "2552"; "0"*)]
    Console.WriteLine("Press any key to exit")
    Console.ReadLine() |> ignore
    ExitWithSuccess

App.config

<?xml version="1.0" encoding="utf-8"?>
<configuration>
  <configSections>
    <section name="akka" type="Akka.Configuration.Hocon.AkkaConfigurationSection, Akka"/>
  </configSections>
  <startup>
    <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.2"/>
  </startup>
  <akka>
    <hocon>
      <![CDATA[
          akka {
            actor {
              provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
            }
            
            remote {
              log-remote-lifecycle-events = DEBUG
              helios.tcp {
                hostname = "localhost"
                port = 0
              }
            }

            cluster {
              seed-nodes = [
                "akka.tcp://ClusterSystem@localhost:2551",
                "akka.tcp://ClusterSystem@localhost:2552"]

              #auto-down-unreachable-after = 30s
            }
          }
      ]]>
    </hocon>
  </akka>
</configuration>

Intro

ThisĀ post is intended to document my current understanding of the Cluster.Sharding feature within Akka.Net. This feature is in pre-release as of 3/17/2017.

I identified from this documentation that a cluster represents a fault-tolerant, elastic, decentralized, peer-to-peer network of programs. The Akka.Net framework provides a library that supports creating and managing a cluster.

Akka.Cluster.Sharding

Akkling.Cluster.Sharding is an open-source library that provides F# support for an Akka.NET cluster. I discovered this library after posting questions on stackoverflow.

Within Akka.Net, there are four terms that are essential for understanding how to implement a cluster using Akka.NET:

  • Node
  • Shard
  • ShardRegion
  • Entity

What is a Node?

I assume a node is a service identified by the combination of a server address and port number that contain one or more shards.

What is a Shard?

A shard is a group of entities that are managed together within a clustered system.

What is a ShardRegion?

A shard region is responsible for locating the shard of an entity as well as delivering messages to an entity. In the event that an entity is not available to receive a message, the ShardRegion will spawn a new entity so that it can deliver its message.

What is an Entity?

An entity is essentially an actor that belongs to a cluster. However, in order to locate an entity, the shard region of that entity needs to be identified.

Sample Code

I took the sample code from the Akkling repo and made minor changes:

open System
open System.IO
#if INTERACTIVE
let cd = Path.Combine(__SOURCE_DIRECTORY__, "../src/Akkling.Cluster.Sharding/bin/Debug")
System.IO.Directory.SetCurrentDirectory(cd)
#endif

#r "../src/Akkling.Cluster.Sharding/bin/Debug/System.Collections.Immutable.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Hyperion.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Newtonsoft.Json.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/FSharp.PowerPack.Linq.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Helios.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/FsPickler.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Google.ProtocolBuffers.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Google.ProtocolBuffers.Serialization.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Remote.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Google.ProtocolBuffers.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Persistence.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Cluster.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Cluster.Tools.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Cluster.Sharding.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Serialization.Hyperion.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akkling.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akkling.Persistence.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akkling.Cluster.Sharding.dll"

open Akka.Actor
open Akka.Configuration
open Akka.Cluster
open Akka.Cluster.Tools.Singleton
open Akka.Cluster.Sharding
open Akka.Persistence

open Akkling
open Akkling.Persistence
open Akkling.Cluster
open Akkling.Cluster.Sharding
open Hyperion

let configWithPort port =
    let config = Configuration.parse ("""
        akka {
            actor {
              provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
              serializers {
                hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
              }
              serialization-bindings {
                "System.Object" = hyperion
              }
            }
          remote {
            helios.tcp {
              public-hostname = "localhost"
              hostname = "localhost"
              port = """ + port.ToString() + """
            }
          }
          cluster {
            auto-down-unreachable-after = 5s
            seed-nodes = [ "akka.tcp://cluster-system@localhost:2551/" ]
          }
          persistence {
            journal.plugin = "akka.persistence.journal.inmem"
            snapshot-store.plugin = "akka.persistence.snapshot-store.local"
          }
        }
        """)
    config.WithFallback(ClusterSingletonManager.DefaultConfig())
 
let consumer (actor:Actor<_>) msg = printfn "\n%A received %s" (actor.Self.Path.ToStringWithAddress()) msg |> ignored
 
// spawn two separate systems with shard regions on each of them
 
let system1 = System.create "cluster-system" (configWithPort 2551)
let shardRegion1 = spawnSharded id system1 "printer" <| props (actorOf2 consumer)
 
// wait a while before starting a second system
System.Threading.Thread.Sleep(3000)
let system2 = System.create "cluster-system" (configWithPort 2552)
let shardRegion2 = spawnSharded id system2 "printer" <| props (actorOf2 consumer)
 
// send hello world to entities on 4 different shards (this means that we will have 4 entities in total)
// NOTE: even thou we sent all messages through single shard region,
//       some of them will be executed on the second one thanks to shard balancing
System.Threading.Thread.Sleep(3000)
shardRegion1 <! ("shard-1", "entity-1", "hello world 1")
shardRegion1 <! ("shard-2", "entity-2", "hello world 2")
shardRegion1 <! ("shard-1", "entity-3", "hello world 3")
shardRegion1 <! ("shard-2", "entity-4", "hello world 4")
 
// check which shards have been build on the second shard region
 
System.Threading.Thread.Sleep(3000)
 
open Akka.Cluster.Sharding
 
let printShards shardRegion =
    async {
        let! reply = (retype shardRegion) <? GetShardRegionStats.Instance         
        let (stats: ShardRegionStats) = reply.Value         
        for kv in stats.Stats do             
            printfn "\tShard '%s' has %d entities on it" kv.Key kv.Value     
    } |> Async.RunSynchronously
 
printfn "\nShards active on node 'localhost:2551':"
printShards shardRegion1
printfn "\nShards active on node 'localhost:2552':"
printShards shardRegion2

The output is the following:

Binding session to 'C:\Users\Snimrod\Desktop\Akkling-master\examples\../src/Akkling.Cluster.Sharding/bin/Debug/Akka.dll'...
Binding session to 'C:\Users\Snimrod\Desktop\Akkling-master\examples\../src/Akkling.Cluster.Sharding/bin/Debug/System.Collections.Immutable.dll'...
[INFO][3/15/2017 2:58:16 PM][Thread 0001][remoting] Starting remoting
[INFO][3/15/2017 2:58:16 PM][Thread 0001][remoting] Remoting started; listening on addresses : [akka.tcp://cluster-system@localhost:2551]
[INFO][3/15/2017 2:58:16 PM][Thread 0001][remoting] Remoting now listens on addresses: [akka.tcp://cluster-system@localhost:2551]
[INFO][3/15/2017 2:58:16 PM][Thread 0001][Cluster] Cluster Node [akka.tcp://cluster-system@localhost:2551] - Starting up...
[INFO][3/15/2017 2:58:16 PM][Thread 0001][Cluster] Cluster Node [akka.tcp://cluster-system@localhost:2551] - Started up successfully
[INFO][3/15/2017 2:58:16 PM][Thread 0007][[akka://cluster-system/system/cluster/core/daemon#1110691120]] Node [akka.tcp://cluster-system@localhost:2551] is JOINING, roles []
[INFO][3/15/2017 2:58:16 PM][Thread 0007][[akka://cluster-system/system/cluster/core/daemon#1110691120]] Leader is moving node [akka.tcp://cluster-system@localhost:2551] to [Up]
[INFO][3/15/2017 2:58:16 PM][Thread 0007][akka://cluster-system/user/sharding/printerCoordinator/singleton/coordinator] Message Register from akka://cluster-system/user/sharding/printer to akka://cluster-system/user/sharding/printerCoordinator/singleton/coordinator was not delivered. 1 dead letters encountered.
[INFO][3/15/2017 2:58:16 PM][Thread 0004][[akka://cluster-system/user/sharding/printerCoordinator#643222226]] Singleton manager [akka.tcp://cluster-system@localhost:2551] starting singleton actor
[INFO][3/15/2017 2:58:16 PM][Thread 0004][[akka://cluster-system/user/sharding/printerCoordinator#643222226]] ClusterSingletonManager state change [Start -> Oldest] Akka.Cluster.Tools.Singleton.Uninitialized
[INFO][3/15/2017 2:58:19 PM][Thread 0001][remoting] Starting remoting
[INFO][3/15/2017 2:58:19 PM][Thread 0001][remoting] Remoting started; listening on addresses : [akka.tcp://cluster-system@localhost:2552]
[INFO][3/15/2017 2:58:19 PM][Thread 0001][remoting] Remoting now listens on addresses: [akka.tcp://cluster-system@localhost:2552]
[INFO][3/15/2017 2:58:19 PM][Thread 0001][Cluster] Cluster Node [akka.tcp://cluster-system@localhost:2552] - Starting up...
[INFO][3/15/2017 2:58:19 PM][Thread 0001][Cluster] Cluster Node [akka.tcp://cluster-system@localhost:2552] - Started up successfully
[INFO][3/15/2017 2:58:19 PM][Thread 0008][[akka://cluster-system/system/cluster/core/daemon#1110691120]] Node [akka.tcp://cluster-system@localhost:2552] is JOINING, roles []
[INFO][3/15/2017 2:58:20 PM][Thread 0004][[akka://cluster-system/system/cluster/core/daemon#205575170]] Welcome from [akka.tcp://cluster-system@localhost:2551]
[INFO][3/15/2017 2:58:20 PM][Thread 0006][[akka://cluster-system/system/cluster/core/daemon#1110691120]] Leader is moving node [akka.tcp://cluster-system@localhost:2552] to [Up]
[INFO][3/15/2017 2:58:20 PM][Thread 0008][[akka://cluster-system/user/sharding/printerCoordinator#1529090680]] ClusterSingletonManager state change [Start -> Younger] Akka.Cluster.Tools.Singleton.Uninitialized

"akka://cluster-system/user/sharding/printer/shard-1/entity-1" received hello world 1

"akka://cluster-system/user/sharding/printer/shard-2/entity-2" received hello world 2

"akka://cluster-system/user/sharding/printer/shard-1/entity-3" received hello world 3

"akka://cluster-system/user/sharding/printer/shard-2/entity-4" received hello world 4

Shards active on node 'localhost:2551':
	Shard 'shard-2' has 2 entities on it

Shards active on node 'localhost:2552':
	Shard 'shard-1' has 2 entities on it

The code that does messaging was the following:

shardRegion1 <! ("shard-1", "entity-1", "hello world 1")
shardRegion1 <! ("shard-2", "entity-2", "hello world 2")
shardRegion1 <! ("shard-1", "entity-3", "hello world 3")
shardRegion1 <! ("shard-2", "entity-4", "hello world 4")

Note how theĀ lines above are all addressing the same shard (i.e. shardRegion1). However, Shard Balancing will attempt to dispatch messages such that they are evenly distributedĀ across shards.

In addition, if we append shard-3 below:

System.Threading.Thread.Sleep(3000)
shardRegion1 <! ("shard-1", "entity-1", "hello world 1")
shardRegion1 <! ("shard-2", "entity-2", "hello world 2")
shardRegion1 <! ("shard-1", "entity-3", "hello world 3")
shardRegion1 <! ("shard-2", "entity-4", "hello world 4")

shardRegion1 <! ("shard-3", "entity-5", "hello world 4")

Then we can observe an update within our nodes on the cluster:

"akka://cluster-system/user/sharding/printer/shard-1/entity-3" received hello world 3

"akka://cluster-system/user/sharding/printer/shard-3/entity-5" received hello world 4

"akka://cluster-system/user/sharding/printer/shard-1/entity-1" received hello world 1

"akka://cluster-system/user/sharding/printer/shard-2/entity-2" received hello world 2

"akka://cluster-system/user/sharding/printer/shard-2/entity-4" received hello world 4

Shards active on node 'localhost:2551':
	Shard 'shard-3' has 1 entities on it
	Shard 'shard-1' has 2 entities on it

Shards active on node 'localhost:2552':
	Shard 'shard-2' has 2 entities on it

Conclusion

The End.