F#: Akka.Net (Cluster.Sharding)

Intro

This post is intended to document my current understanding of the Cluster.Sharding feature within Akka.Net. This feature is in pre-release as of 3/17/2017.

I identified from this documentation that a cluster represents a fault-tolerant, elastic, decentralized, peer-to-peer network of programs. The Akka.Net framework provides a library that supports creating and managing a cluster.

Akka.Cluster.Sharding

Akkling.Cluster.Sharding is an open-source library that provides F# support for an Akka.NET cluster. I discovered this library after posting questions on stackoverflow.

Within Akka.Net, there are four terms that are essential for understanding how to implement a cluster using Akka.NET:

  • Node
  • Shard
  • ShardRegion
  • Entity

What is a Node?

I assume a node is a service identified by the combination of a server address and port number that contain one or more shards.

What is a Shard?

A shard is a group of entities that are managed together within a clustered system.

What is a ShardRegion?

A shard region is responsible for locating the shard of an entity as well as delivering messages to an entity. In the event that an entity is not available to receive a message, the ShardRegion will spawn a new entity so that it can deliver its message.

What is an Entity?

An entity is essentially an actor that belongs to a cluster. However, in order to locate an entity, the shard region of that entity needs to be identified.

Sample Code

I took the sample code from the Akkling repo and made minor changes:

open System
open System.IO
#if INTERACTIVE
let cd = Path.Combine(__SOURCE_DIRECTORY__, "../src/Akkling.Cluster.Sharding/bin/Debug")
System.IO.Directory.SetCurrentDirectory(cd)
#endif

#r "../src/Akkling.Cluster.Sharding/bin/Debug/System.Collections.Immutable.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Hyperion.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Newtonsoft.Json.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/FSharp.PowerPack.Linq.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Helios.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/FsPickler.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Google.ProtocolBuffers.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Google.ProtocolBuffers.Serialization.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Remote.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Google.ProtocolBuffers.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Persistence.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Cluster.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Cluster.Tools.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Cluster.Sharding.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akka.Serialization.Hyperion.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akkling.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akkling.Persistence.dll"
#r "../src/Akkling.Cluster.Sharding/bin/Debug/Akkling.Cluster.Sharding.dll"

open Akka.Actor
open Akka.Configuration
open Akka.Cluster
open Akka.Cluster.Tools.Singleton
open Akka.Cluster.Sharding
open Akka.Persistence

open Akkling
open Akkling.Persistence
open Akkling.Cluster
open Akkling.Cluster.Sharding
open Hyperion

let configWithPort port =
    let config = Configuration.parse ("""
        akka {
            actor {
              provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
              serializers {
                hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
              }
              serialization-bindings {
                "System.Object" = hyperion
              }
            }
          remote {
            helios.tcp {
              public-hostname = "localhost"
              hostname = "localhost"
              port = """ + port.ToString() + """
            }
          }
          cluster {
            auto-down-unreachable-after = 5s
            seed-nodes = [ "akka.tcp://cluster-system@localhost:2551/" ]
          }
          persistence {
            journal.plugin = "akka.persistence.journal.inmem"
            snapshot-store.plugin = "akka.persistence.snapshot-store.local"
          }
        }
        """)
    config.WithFallback(ClusterSingletonManager.DefaultConfig())
 
let consumer (actor:Actor<_>) msg = printfn "\n%A received %s" (actor.Self.Path.ToStringWithAddress()) msg |> ignored
 
// spawn two separate systems with shard regions on each of them
 
let system1 = System.create "cluster-system" (configWithPort 2551)
let shardRegion1 = spawnSharded id system1 "printer" <| props (actorOf2 consumer)
 
// wait a while before starting a second system
System.Threading.Thread.Sleep(3000)
let system2 = System.create "cluster-system" (configWithPort 2552)
let shardRegion2 = spawnSharded id system2 "printer" <| props (actorOf2 consumer)
 
// send hello world to entities on 4 different shards (this means that we will have 4 entities in total)
// NOTE: even thou we sent all messages through single shard region,
//       some of them will be executed on the second one thanks to shard balancing
System.Threading.Thread.Sleep(3000)
shardRegion1 <! ("shard-1", "entity-1", "hello world 1")
shardRegion1 <! ("shard-2", "entity-2", "hello world 2")
shardRegion1 <! ("shard-1", "entity-3", "hello world 3")
shardRegion1 <! ("shard-2", "entity-4", "hello world 4")
 
// check which shards have been build on the second shard region
 
System.Threading.Thread.Sleep(3000)
 
open Akka.Cluster.Sharding
 
let printShards shardRegion =
    async {
        let! reply = (retype shardRegion) <? GetShardRegionStats.Instance         
        let (stats: ShardRegionStats) = reply.Value         
        for kv in stats.Stats do             
            printfn "\tShard '%s' has %d entities on it" kv.Key kv.Value     
    } |> Async.RunSynchronously
 
printfn "\nShards active on node 'localhost:2551':"
printShards shardRegion1
printfn "\nShards active on node 'localhost:2552':"
printShards shardRegion2

The output is the following:

Binding session to 'C:\Users\Snimrod\Desktop\Akkling-master\examples\../src/Akkling.Cluster.Sharding/bin/Debug/Akka.dll'...
Binding session to 'C:\Users\Snimrod\Desktop\Akkling-master\examples\../src/Akkling.Cluster.Sharding/bin/Debug/System.Collections.Immutable.dll'...
[INFO][3/15/2017 2:58:16 PM][Thread 0001][remoting] Starting remoting
[INFO][3/15/2017 2:58:16 PM][Thread 0001][remoting] Remoting started; listening on addresses : [akka.tcp://cluster-system@localhost:2551]
[INFO][3/15/2017 2:58:16 PM][Thread 0001][remoting] Remoting now listens on addresses: [akka.tcp://cluster-system@localhost:2551]
[INFO][3/15/2017 2:58:16 PM][Thread 0001][Cluster] Cluster Node [akka.tcp://cluster-system@localhost:2551] - Starting up...
[INFO][3/15/2017 2:58:16 PM][Thread 0001][Cluster] Cluster Node [akka.tcp://cluster-system@localhost:2551] - Started up successfully
[INFO][3/15/2017 2:58:16 PM][Thread 0007][[akka://cluster-system/system/cluster/core/daemon#1110691120]] Node [akka.tcp://cluster-system@localhost:2551] is JOINING, roles []
[INFO][3/15/2017 2:58:16 PM][Thread 0007][[akka://cluster-system/system/cluster/core/daemon#1110691120]] Leader is moving node [akka.tcp://cluster-system@localhost:2551] to [Up]
[INFO][3/15/2017 2:58:16 PM][Thread 0007][akka://cluster-system/user/sharding/printerCoordinator/singleton/coordinator] Message Register from akka://cluster-system/user/sharding/printer to akka://cluster-system/user/sharding/printerCoordinator/singleton/coordinator was not delivered. 1 dead letters encountered.
[INFO][3/15/2017 2:58:16 PM][Thread 0004][[akka://cluster-system/user/sharding/printerCoordinator#643222226]] Singleton manager [akka.tcp://cluster-system@localhost:2551] starting singleton actor
[INFO][3/15/2017 2:58:16 PM][Thread 0004][[akka://cluster-system/user/sharding/printerCoordinator#643222226]] ClusterSingletonManager state change [Start -> Oldest] Akka.Cluster.Tools.Singleton.Uninitialized
[INFO][3/15/2017 2:58:19 PM][Thread 0001][remoting] Starting remoting
[INFO][3/15/2017 2:58:19 PM][Thread 0001][remoting] Remoting started; listening on addresses : [akka.tcp://cluster-system@localhost:2552]
[INFO][3/15/2017 2:58:19 PM][Thread 0001][remoting] Remoting now listens on addresses: [akka.tcp://cluster-system@localhost:2552]
[INFO][3/15/2017 2:58:19 PM][Thread 0001][Cluster] Cluster Node [akka.tcp://cluster-system@localhost:2552] - Starting up...
[INFO][3/15/2017 2:58:19 PM][Thread 0001][Cluster] Cluster Node [akka.tcp://cluster-system@localhost:2552] - Started up successfully
[INFO][3/15/2017 2:58:19 PM][Thread 0008][[akka://cluster-system/system/cluster/core/daemon#1110691120]] Node [akka.tcp://cluster-system@localhost:2552] is JOINING, roles []
[INFO][3/15/2017 2:58:20 PM][Thread 0004][[akka://cluster-system/system/cluster/core/daemon#205575170]] Welcome from [akka.tcp://cluster-system@localhost:2551]
[INFO][3/15/2017 2:58:20 PM][Thread 0006][[akka://cluster-system/system/cluster/core/daemon#1110691120]] Leader is moving node [akka.tcp://cluster-system@localhost:2552] to [Up]
[INFO][3/15/2017 2:58:20 PM][Thread 0008][[akka://cluster-system/user/sharding/printerCoordinator#1529090680]] ClusterSingletonManager state change [Start -> Younger] Akka.Cluster.Tools.Singleton.Uninitialized

"akka://cluster-system/user/sharding/printer/shard-1/entity-1" received hello world 1

"akka://cluster-system/user/sharding/printer/shard-2/entity-2" received hello world 2

"akka://cluster-system/user/sharding/printer/shard-1/entity-3" received hello world 3

"akka://cluster-system/user/sharding/printer/shard-2/entity-4" received hello world 4

Shards active on node 'localhost:2551':
	Shard 'shard-2' has 2 entities on it

Shards active on node 'localhost:2552':
	Shard 'shard-1' has 2 entities on it

The code that does messaging was the following:

shardRegion1 <! ("shard-1", "entity-1", "hello world 1")
shardRegion1 <! ("shard-2", "entity-2", "hello world 2")
shardRegion1 <! ("shard-1", "entity-3", "hello world 3")
shardRegion1 <! ("shard-2", "entity-4", "hello world 4")

Note how the lines above are all addressing the same shard (i.e. shardRegion1). However, Shard Balancing will attempt to dispatch messages such that they are evenly distributed across shards.

In addition, if we append shard-3 below:

System.Threading.Thread.Sleep(3000)
shardRegion1 <! ("shard-1", "entity-1", "hello world 1")
shardRegion1 <! ("shard-2", "entity-2", "hello world 2")
shardRegion1 <! ("shard-1", "entity-3", "hello world 3")
shardRegion1 <! ("shard-2", "entity-4", "hello world 4")

shardRegion1 <! ("shard-3", "entity-5", "hello world 4")

Then we can observe an update within our nodes on the cluster:

"akka://cluster-system/user/sharding/printer/shard-1/entity-3" received hello world 3

"akka://cluster-system/user/sharding/printer/shard-3/entity-5" received hello world 4

"akka://cluster-system/user/sharding/printer/shard-1/entity-1" received hello world 1

"akka://cluster-system/user/sharding/printer/shard-2/entity-2" received hello world 2

"akka://cluster-system/user/sharding/printer/shard-2/entity-4" received hello world 4

Shards active on node 'localhost:2551':
	Shard 'shard-3' has 1 entities on it
	Shard 'shard-1' has 2 entities on it

Shards active on node 'localhost:2552':
	Shard 'shard-2' has 2 entities on it

Conclusion

The End.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: