Kuidas teha lihtsat rakendust Akka Clusteriga
Kui lugesite minu eelmist lugu Scalachaini kohta, siis ilmselt märkasite, et see pole kaugeltki hajutatud süsteem. Sellel puuduvad kõik funktsioonid, et korralikult teiste sõlmedega töötada. Lisage sellele, et ühest sõlmest koosnev plokiahel on kasutu. Sel põhjusel otsustasin, et on aeg selle teemaga tegeleda.
Kuna Scalachaini töötab Akka, siis miks mitte kasutada võimalust Akka Clusteriga mängida? Ma lõin lihtsa projekti, et natuke Akka Clusteriga nokitseda ja selles loos jagan oma õppeid. Loome kolmest sõlmest koosneva klastri, kasutades selleks klastri teadlikke marsruutereid nende vahelise koormuse tasakaalustamiseks. Kõik töötab Dockeri konteineris ja hõlpsaks juurutamiseks kasutame dokkeri koostamist.
Ok, veereme! ?
Kiire sissejuhatus Akka klastrisse
Akka Cluster pakub laialdaste rakenduste loomisel suurt tuge. Parim kasutusjuht on siis, kui teil on sõlm, mida soovite hajutatud keskkonnas N korda korrata. See tähendab, et kõik N sõlme on sama koodi käitavad eakaaslased. Akka Cluster annab teile kastist avastamise liikmed samas klastris. Cluster Aware Routerite abil on võimalik eri sõlmedes olevate osalejate sõnumeid tasakaalustada. Samuti on võimalik valida tasakaalustamispoliitika, tehes koormuse tasakaalustamine torditüki!
Tegelikult saate valida kahe tüüpi ruuterite vahel:
Grupi marsruuter - osalejad, kellele sõnumeid saata, nimetatakse marsruutideks, määratakse nende näitlejate abil. Ruuterid jagavad klastris loodud marsruute. Selles näites kasutame rühma ruuterit.

Pool Router - marsruuterid on ruuteri loodud ja juurutatud, seega on nad näitlejahierarhias selle lapsed. Ruutereid ruuterite vahel ei jagata. See sobib ideaalselt esmase koopia stsenaariumi jaoks, kus iga ruuter on peamine ja tema marsruutid koopiad.

See on vaid jäämäe tipp, nii et kutsun teid lisateabe saamiseks lugema ametlikku dokumentatsiooni.
Klaster matemaatiliste arvutuste jaoks
Kujutame ette kasutusjuhtumi. Oletame, et kavandatakse süsteem matemaatiliste arvutuste teostamiseks nõudmisel. Süsteem on juurutatud võrgus, seega vajab see arvutustaotluste vastuvõtmiseks REST API-d. Sisemine protsessor tegeleb nende taotlustega, teostades arvutuse ja tagastades tulemuse.
Praegu saab protsessor arvutada ainult Fibonacci numbri. Otsustame kasutada sõlmede klastrit koormuse jaotamiseks sõlmede vahel ja jõudluse parandamiseks. Akka Cluster tegeleb klastrite dünaamikaga ja sõlmede vahelise koormuse tasakaalustamisega. Kõlab hästi!
Näitlejate hierarhia
Esiteks kõigepealt: peame määratlema oma näitleja hierarhia. Süsteemi saab jagada kolmeks funktsionaalseks osaks: äriloogika , klastrite haldamine ja sõlm ise. Seal on ka server, kuid see pole näitleja ja selle kallal töötame hiljem.
Äriloogika
Rakendus peaks tegema matemaatilisi arvutusi. Processor
Kõigi arvutusülesannete haldamiseks saame määratleda lihtsa osaleja. Iga arvutust, mida toetame, saab rakendada konkreetses näitlejas, see on selle lapse laps Processor
. Nii on rakendus modulaarne ning seda on lihtsam laiendada ja hooldada. Praegu Processor
on ProcessorFibonacci
näitleja ainus laps . Oletan, et võite arvata, mis on selle ülesanne. Sellest peaks alustamiseks piisama.
Klastrite haldamine
Klastri haldamiseks vajame a ClusterManager
. Kõlab lihtsalt, eks? See näitleja tegeleb kõigega, mis on klastriga seotud, näiteks tagastab oma liikmed, kui seda küsitakse. Kasulik oleks klastris toimuva logimine, nii et määratleme ClusterListener
näitleja. See on lapse laps ClusterManager
ja tellib klastrisündmused, mis neid logivad.
Sõlm
Node
Näitleja on just meie hierarhia. API-ga suhtleb meie süsteemi sisenemispunkt. Processor
Ja ClusterManager
on oma lastele, koos ProcessorRouter
näitleja. See on süsteemi koormuse tasakaalustaja, jaotades koormuse Processor
s vahel. Konfigureerime selle kui klastri teadlikku ruuterit, nii et igaüks ProcessorRouter
saab Processor
igas sõlmes s-ile sõnumeid saata .

Näitleja rakendamine
Aeg meie näitlejaid ellu viia! Esmalt rakendame süsteemi äriloogikaga seotud osalejaid. Seejärel liigume klastrihalduse osalejate ja Node
lõpuks juurosalise ( ) poole.
ProtsessorFibonacci
See näitleja teostab Fibonacci numbri arvutamise. See saab Compute
teate, mis sisaldab arvutamiseks vajalikku numbrit ja osaleja viidet, millele vastata. Viide on oluline, kuna taotlevaid osalejaid võib olla erinevaid. Pidage meeles, et töötame hajutatud keskkonnas!
Kui Compute
teade on kätte saadud, fibonacci
arvutab funktsioon tulemuse. Me pakendame selle ProcessorResponse
objekti, et anda teavet arvutuse teinud sõlme kohta. See on hiljem kasulik, et näha ring-robini poliitikat toimimas.
Seejärel saadetakse tulemus näitlejale, kellele peaksime vastama. Lihtne.
object ProcessorFibonacci { sealed trait ProcessorFibonacciMessage case class Compute(n: Int, replyTo: ActorRef) extends ProcessorFibonacciMessage def props(nodeId: String) = Props(new ProcessorFibonacci(nodeId)) def fibonacci(x: Int): BigInt = { @tailrec def fibHelper(x: Int, prev: BigInt = 0, next: BigInt = 1): BigInt = x match { case 0 => prev case 1 => next case _ => fibHelper(x - 1, next, next + prev) } fibHelper(x) } } class ProcessorFibonacci(nodeId: String) extends Actor { import ProcessorFibonacci._ override def receive: Receive = { case Compute(value, replyTo) => { replyTo ! ProcessorResponse(nodeId, fibonacci(value)) } } }
Protsessor
Processor
Näitleja haldab konkreetse sub-protsessorid, nagu Fibonacci üks. See peaks töötlema alltöötlejad ja edastama neile päringud. Praegu on meil ainult üks sub-protsessor, nii Processor
saab ühte liiki teade: ComputeFibonacci
. See teade sisaldab arvutamiseks Fibonacci numbrit. Kui see on kätte saadud, saadetakse arvutusarv FibonacciProcessor
koos a viitega numbrile sender()
.
object Processor { sealed trait ProcessorMessage case class ComputeFibonacci(n: Int) extends ProcessorMessage def props(nodeId: String) = Props(new Processor(nodeId)) } class Processor(nodeId: String) extends Actor { import Processor._ val fibonacciProcessor: ActorRef = context.actorOf(ProcessorFibonacci.props(nodeId), "fibonacci") override def receive: Receive = { case ComputeFibonacci(value) => { val replyTo = sender() fibonacciProcessor ! Compute(value, replyTo) } } }
ClusterListener
Soovime logida kasulikku teavet klastris toimuva kohta. See aitaks meil vajadusel süsteemi siluda. See on ClusterListener
näitleja eesmärk . Enne alustamist tellib ta klastri sündmusteateid. Näitleja reageerib kirjadele meeldi MemberUp
, UnreachableMember
või MemberRemoved
, logib vastava sündmuse. Kui see ClusterListener
on peatatud, loobub ta klastri sündmustest.
object ClusterListener { def props(nodeId: String, cluster: Cluster) = Props(new ClusterListener(nodeId, cluster)) } class ClusterListener(nodeId: String, cluster: Cluster) extends Actor with ActorLogging { override def preStart(): Unit = { cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember]) } override def postStop(): Unit = cluster.unsubscribe(self) def receive = { case MemberUp(member) => log.info("Node {} - Member is Up: {}", nodeId, member.address) case UnreachableMember(member) => log.info(s"Node {} - Member detected as unreachable: {}", nodeId, member) case MemberRemoved(member, previousStatus) => log.info(s"Node {} - Member is Removed: {} after {}", nodeId, member.address, previousStatus) case _: MemberEvent => // ignore } }
ClusterManager
Klastri juhtimise eest vastutav tegutseja on ClusterManager
. See loob ClusterListener
näitleja ja esitab nõudmisel klastri liikmete nimekirja. Seda võiks laiendada, et lisada rohkem funktsioone, kuid praegu piisab sellest.
object ClusterManager { sealed trait ClusterMessage case object GetMembers extends ClusterMessage def props(nodeId: String) = Props(new ClusterManager(nodeId)) } class ClusterManager(nodeId: String) extends Actor with ActorLogging { val cluster: Cluster = Cluster(context.system) val listener: ActorRef = context.actorOf(ClusterListener.props(nodeId, cluster), "clusterListener") override def receive: Receive = { case GetMembers => { sender() ! cluster.state.members.filter(_.status == MemberStatus.up) .map(_.address.toString) .toList } } }
ProcessorRouter
The load-balancing among processors is handled by the ProcessorRouter
. It is created by the Node
actor, but this time all the required information are provided in the configuration of the system.
class Node(nodeId: String) extends Actor { //... val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter") //... }
Let’s analyse the relevant part in the application.conf
file.
akka { actor { ... deployment { /node/processorRouter { router = round-robin-group routees.paths = ["/user/node/processor"] cluster { enabled = on allow-local-routees = on } } } } ... }
The first thing is to specify the path to the router actor, that is /node/processorRouter
. Inside that property we can configure the behaviour of the router:
router
: this is the policy for the load balancing of messages. I chose theround-robin-group
, but there are many others.routees.paths
: these are the paths to the actors that will receive the messages handled by the router. We are saying: “When you receive a message, look for the actors corresponding to these paths. Choose one according to the policy and forward the message to it.” Since we are using Cluster Aware Routers, the routees can be on any node of the cluster.cluster.enabled
: are we operating in a cluster? The answer ison
, of course!cluster.allow-local-routees
: here we are allowing the router to choose a routee in its node.
Using this configuration we can create a router to load balance the work among our processors.
Node
The root of our actor hierarchy is the Node
. It creates the children actors — ClusterManager
, Processor
, and ProcessorRouter
— and forwards the messages to the right one. Nothing complex here.
object Node { sealed trait NodeMessage case class GetFibonacci(n: Int) case object GetClusterMembers def props(nodeId: String) = Props(new Node(nodeId)) } class Node(nodeId: String) extends Actor { val processor: ActorRef = context.actorOf(Processor.props(nodeId), "processor") val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter") val clusterManager: ActorRef = context.actorOf(ClusterManager.props(nodeId), "clusterManager") override def receive: Receive = { case GetClusterMembers => clusterManager forward GetMembers case GetFibonacci(value) => processorRouter forward ComputeFibonacci(value) } }
Server and API
Every node of our cluster runs a server able to receive requests. The Server
creates our actor system and is configured through the application.conf
file.
object Server extends App with NodeRoutes { implicit val system: ActorSystem = ActorSystem("cluster-playground") implicit val materializer: ActorMaterializer = ActorMaterializer() val config: Config = ConfigFactory.load() val address = config.getString("http.ip") val port = config.getInt("http.port") val nodeId = config.getString("clustering.ip") val node: ActorRef = system.actorOf(Node.props(nodeId), "node") lazy val routes: Route = healthRoute ~ statusRoutes ~ processRoutes Http().bindAndHandle(routes, address, port) println(s"Node $nodeId is listening at //$address:$port") Await.result(system.whenTerminated, Duration.Inf) }
Akka HTTP powers the server itself and the REST API, exposing three simple endpoints. These endpoints are defined in the NodeRoutes
trait.
The first one is /health
, to check the health of a node. It responds with a 200 OK
if the node is up and running
lazy val healthRoute: Route = pathPrefix("health") { concat( pathEnd { concat( get { complete(StatusCodes.OK) } ) } ) }
The /status/members
endpoint responds with the current active members of the cluster.
lazy val statusRoutes: Route = pathPrefix("status") { concat( pathPrefix("members") { concat( pathEnd { concat( get { val membersFuture: Future[List[String]] = (node ? GetClusterMembers).mapTo[List[String]] onSuccess(membersFuture) { members => complete(StatusCodes.OK, members) } } ) } ) } ) }
The last (but not the least) is the /process/fibonacci/n
endpoint, used to request the Fibonacci number of n
.
lazy val processRoutes: Route = pathPrefix("process") { concat( pathPrefix("fibonacci") { concat( path(IntNumber) { n => pathEnd { concat( get { val processFuture: Future[ProcessorResponse] = (node ? GetFibonacci(n)).mapTo[ProcessorResponse] onSuccess(processFuture) { response => complete(StatusCodes.OK, response) } } ) } } ) } ) }
It responds with a ProcessorResponse
containing the result, along with the id of the node where the computation took place.
Cluster Configuration
Once we have all our actors, we need to configure the system to run as a cluster! The application.conf
file is where the magic takes place. I’m going to split it in pieces to present it better, but you can find the complete file here.
Let’s start defining some useful variables.
clustering { ip = "127.0.0.1" ip = ${?CLUSTER_IP} port = 2552 port = ${?CLUSTER_PORT} seed-ip = "127.0.0.1" seed-ip = ${?CLUSTER_SEED_IP} seed-port = 2552 seed-port = ${?CLUSTER_SEED_PORT} cluster.name = "cluster-playground" }
Here we are simply defining the ip and port of the nodes and the seed, as well as the cluster name. We set a default value, then we override it if a new one is specified. The configuration of the cluster is the following.
akka { actor { provider = "cluster" ... /* router configuration */ ... } remote { log-remote-lifecycle-events = on netty.tcp { hostname = ${clustering.ip} port = ${clustering.port} } } cluster { seed-nodes = [ "akka.tcp://"${clustering.cluster.name}"@"${clustering.seed-ip}":"${clustering.seed-port} ] auto-down-unreachable-after = 10s } } ... /* server vars */ ... /* cluster vars */ }
Akka Cluster is build on top of Akka Remoting, so we need to configure it properly. First of all, we specify that we are going to use Akka Cluster saying that provider = "cluster"
. Then we bind cluster.ip
and cluster.port
to the hostname
and port
of the netty
web framework.
The cluster requires some seed nodes as its entry points. We set them in the seed-nodes
array, in the format akka.tcp://"{clustering.cluster.name}"@"{clustering.seed-ip}":”${clustering.seed-port}”
. Right now we have one seed node, but we may add more later.
The auto-down-unreachable-after
property sets a member as down after it is unreachable for a period of time. This should be used only during development, as explained in the official documentation.
Ok, the cluster is configured, we can move to the next step: Dockerization and deployment!
Dockerization and deployment
To create the Docker container of our node we can use sbt-native-packager. Its installation is easy: add addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.15")
to the plugin.sbt
file in the project/
folder. This amazing tool has a plugin for the creation of Docker containers. it allows us to configure the properties of our Dockerfile in the build.sbt
file.
// other build.sbt properties enablePlugins(JavaAppPackaging) enablePlugins(DockerPlugin) enablePlugins(AshScriptPlugin) mainClass in Compile := Some("com.elleflorio.cluster.playground.Server") dockerBaseImage := "java:8-jre-alpine" version in Docker := "latest" dockerExposedPorts := Seq(8000) dockerRepository := Some("elleflorio")
Once we have setup the plugin, we can create the docker image running the command sbt docker:publishLocal
. Run the command and taste the magic… ?
We have the Docker image of our node, now we need to deploy it and check that everything works fine. The easiest way is to create a docker-compose
file that will spawn a seed and a couple of other nodes.
version: '3.5' networks: cluster-network: services: seed: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '2552:2552' - '8000:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: seed CLUSTER_SEED_IP: seed node1: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '8001:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: node1 CLUSTER_PORT: 1600 CLUSTER_SEED_IP: seed CLUSTER_SEED_PORT: 2552 node2: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '8002:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: node2 CLUSTER_PORT: 1600 CLUSTER_SEED_IP: seed CLUSTER_SEED_PORT: 2552
I won’t spend time going through it, since it is quite simple.
Let’s run it!
Time to test our work! Once we run the docker-compose up
command, we will have a cluster of three nodes up and running. The seed
will respond to requests at port :8000
, while node1
and node2
at port :8001
and :8002
. Play a bit with the various endpoints. You will see that the requests for a Fibonacci number will be computed by a different node each time, following a round-robin policy. That’s good, we are proud of our work and can get out for a beer to celebrate! ?
Conclusion
We are done here! We learned a lot of things in these ten minutes:
- What Akka Cluster is and what can do for us.
- How to create a distributed application with it.
- How to configure a Group Router for load-balancing in the cluster.
- How to Dockerize everything and deploy it using docker-compose.
Täieliku rakenduse leiate minu GitHubi repost. Julgelt panustage või mängige sellega oma äranägemise järgi! ?
Näeme! ?