AkkaClusterで簡単なアプリケーションを作成する方法

Scalachainについての私の前の話を読んだら、おそらくそれが分散システムではないことに気づいたでしょう。他のノードと適切に連携するためのすべての機能が不足しています。それに加えて、単一のノードで構成されるブロックチェーンは役に立たない。このため、私はこの問題に取り組む時が来たと判断しました。

ScalachainはAkkaを利用しているので、Akka Clusterで遊んでみませんか?Akka Clusterを少しいじくり回す簡単なプロジェクトを作成しました。このストーリーでは、学んだことを共有します。クラスター対応ルーターを使用してノード間の負荷を分散し、3つのノードのクラスターを作成します。すべてがDockerコンテナーで実行され、簡単にデプロイできるようにdocker-composeを使用します。

さあ、転がしてみよう!?

Akkaクラスターの簡単な紹介

Akka Clusterは、分散アプリケーションの作成を強力にサポートします。最良の使用例は、分散環境でN回複製するノードがある場合です。これは、N個のノードすべてが同じコードを実行しているピアであることを意味します。Akka Clusterを使用すると、同じクラスター内のメンバーをすぐに検出できます。クラスター対応ルーターを使用すると、異なるノードのアクター間でメッセージのバランスを取ることができます。バランスポリシーを選択して、負荷分散を簡単にすることもできます。

実際には、次の2種類のルーターから選択できます。

グループルーター—メッセージの送信先のアクター(ルートと呼ばれる)は、アクターパスを使用して指定されます。ルーターは、クラスターで作成されたルートを共有します。この例では、グループルーターを使用します。

プールルーター—ルートはルーターによって作成および展開されるため、アクター階層内の子になります。ルートはルーター間で共有されません。これは、各ルーターがプライマリであり、そのルーターがレプリカをルーティングするプライマリレプリカシナリオに最適です。

これは氷山の一角にすぎないので、より多くの洞察を得るために公式ドキュメントを読むことをお勧めします。

数学的計算のためのクラスター

ユースケースのシナリオを想像してみましょう。要求に応じて数学的計算を実行するシステムを設計するとします。システムはオンラインでデプロイされるため、計算要求を受信するにはRESTAPIが必要です。内部プロセッサがこれらの要求を処理し、計算を実行して結果を返します。

現在、プロセッサはフィボナッチ数しか計算できません。ノードのクラスターを使用して、ノード間で負荷を分散し、パフォーマンスを向上させることにしました。Akka Clusterは、クラスターのダイナミクスとノード間の負荷分散を処理します。いいですね!

アクター階層

まず最初に:アクター階層を定義する必要があります。システムは、ビジネスロジッククラスター管理、およびノード自体の3つの機能部分に分けることができます。サーバーもありますが、アクターではありません。後で作業します。

ビジネスの論理

アプリケーションは数学的な計算を行う必要があります。Processorすべての計算タスクを管理するための単純なアクターを定義できます。私たちがサポートするすべての計算は、特定のアクターで実装できます。特定のアクターはその子になりProcessorます。このように、アプリケーションはモジュール式であり、拡張と保守が容易です。今の唯一の子はProcessorとなりますProcessorFibonacci俳優。私はあなたがその仕事が何であるかを推測できると思います。これで開始できます。

クラスター管理

クラスターを管理するには、が必要ClusterManagerです。簡単そうですね。このアクターは、要求されたときにメンバーを返すなど、クラスターに関連するすべてを処理します。クラスタ内で何が起こっているかをログに記録すると便利なので、ClusterListenerアクターを定義します。これはの子でありClusterManager、それらをログに記録するクラスターイベントをサブスクライブします。

ノード

Node俳優は私達の階層のルートです。APIと通信するのはシステムのエントリポイントです。一緒に、その子である俳優。これはシステムのロードバランサーであり、s間で負荷を分散します。これをクラスター対応ルーターとして構成し、すべてのノードがすべてのノードのにメッセージを送信できるようにします。ProcessorClusterManagerProcessorRouterProcessorProcessorRouterProcessor

アクターの実装

アクターを実装する時が来ました!まず、システムのビジネスロジックに関連するアクターを実装します。次に、クラスター管理のアクターNodeと、最後にルートアクター()に移ります。

ProcessorFibonacci

このアクターは、フィボナッチ数の計算を実行します。Compute計算する番号と応答するアクターの参照を含むメッセージを受信します。要求するアクターが異なる可能性があるため、参照は重要です。分散環境で作業していることを忘れないでください。

一旦Computeメッセージが受信され、fibonacci関数は、結果を計算します。それをProcessorResponseオブジェクトでラップして、計算を実行したノードに関する情報を提供します。これは、後でラウンドロビンポリシーの動作を確認するのに役立ちます。

結果は、返信する必要のあるアクターに送信されます。簡単-簡単。

object ProcessorFibonacci { sealed trait ProcessorFibonacciMessage case class Compute(n: Int, replyTo: ActorRef) extends ProcessorFibonacciMessage def props(nodeId: String) = Props(new ProcessorFibonacci(nodeId)) def fibonacci(x: Int): BigInt = { @tailrec def fibHelper(x: Int, prev: BigInt = 0, next: BigInt = 1): BigInt = x match { case 0 => prev case 1 => next case _ => fibHelper(x - 1, next, next + prev) } fibHelper(x) } } class ProcessorFibonacci(nodeId: String) extends Actor { import ProcessorFibonacci._ override def receive: Receive = { case Compute(value, replyTo) => { replyTo ! ProcessorResponse(nodeId, fibonacci(value)) } } }

プロセッサー

The Processor actor manages the specific sub-processors, like the Fibonacci one. It should instantiate the sub-processors and forward the requests to them. Right now we only have one sub-processor, so the Processor receives one kind of message: ComputeFibonacci. This message contains the Fibonacci number to compute. Once received, the number to compute is sent to a FibonacciProcessor, along with the reference of the sender().

object Processor { sealed trait ProcessorMessage case class ComputeFibonacci(n: Int) extends ProcessorMessage def props(nodeId: String) = Props(new Processor(nodeId)) } class Processor(nodeId: String) extends Actor { import Processor._ val fibonacciProcessor: ActorRef = context.actorOf(ProcessorFibonacci.props(nodeId), "fibonacci") override def receive: Receive = { case ComputeFibonacci(value) => { val replyTo = sender() fibonacciProcessor ! Compute(value, replyTo) } } }

ClusterListener

We would like to log useful information about what happens in the cluster. This could help us to debug the system if we need to. This is the purpose of the ClusterListener actor. Before starting, it subscribes itself to the event messages of the cluster. The actor reacts to messages like MemberUp, UnreachableMember, or MemberRemoved, logging the corresponding event. When ClusterListener is stopped, it unsubscribes itself from the cluster events.

object ClusterListener { def props(nodeId: String, cluster: Cluster) = Props(new ClusterListener(nodeId, cluster)) } class ClusterListener(nodeId: String, cluster: Cluster) extends Actor with ActorLogging { override def preStart(): Unit = { cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember]) } override def postStop(): Unit = cluster.unsubscribe(self) def receive = { case MemberUp(member) => log.info("Node {} - Member is Up: {}", nodeId, member.address) case UnreachableMember(member) => log.info(s"Node {} - Member detected as unreachable: {}", nodeId, member) case MemberRemoved(member, previousStatus) => log.info(s"Node {} - Member is Removed: {} after {}", nodeId, member.address, previousStatus) case _: MemberEvent => // ignore } }

ClusterManager

The actor responsible of the management of the cluster is ClusterManager. It creates the ClusterListener actor, and provides the list of cluster members upon request. It could be extended to add more functionalities, but right now this is enough.

object ClusterManager { sealed trait ClusterMessage case object GetMembers extends ClusterMessage def props(nodeId: String) = Props(new ClusterManager(nodeId)) } class ClusterManager(nodeId: String) extends Actor with ActorLogging { val cluster: Cluster = Cluster(context.system) val listener: ActorRef = context.actorOf(ClusterListener.props(nodeId, cluster), "clusterListener") override def receive: Receive = { case GetMembers => { sender() ! cluster.state.members.filter(_.status == MemberStatus.up) .map(_.address.toString) .toList } } }

ProcessorRouter

The load-balancing among processors is handled by the ProcessorRouter. It is created by the Node actor, but this time all the required information are provided in the configuration of the system.

class Node(nodeId: String) extends Actor { //... val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter") //... }

Let’s analyse the relevant part in the application.conf file.

akka { actor { ... deployment { /node/processorRouter { router = round-robin-group routees.paths = ["/user/node/processor"] cluster { enabled = on allow-local-routees = on } } } } ... }

The first thing is to specify the path to the router actor, that is /node/processorRouter. Inside that property we can configure the behaviour of the router:

  • router: this is the policy for the load balancing of messages. I chose the round-robin-group, but there are many others.
  • routees.paths: these are the paths to the actors that will receive the messages handled by the router. We are saying: “When you receive a message, look for the actors corresponding to these paths. Choose one according to the policy and forward the message to it.” Since we are using Cluster Aware Routers, the routees can be on any node of the cluster.
  • cluster.enabled: are we operating in a cluster? The answer is on, of course!
  • cluster.allow-local-routees: here we are allowing the router to choose a routee in its node.

Using this configuration we can create a router to load balance the work among our processors.

Node

The root of our actor hierarchy is the Node. It creates the children actors — ClusterManager, Processor, and ProcessorRouter — and forwards the messages to the right one. Nothing complex here.

object Node { sealed trait NodeMessage case class GetFibonacci(n: Int) case object GetClusterMembers def props(nodeId: String) = Props(new Node(nodeId)) } class Node(nodeId: String) extends Actor { val processor: ActorRef = context.actorOf(Processor.props(nodeId), "processor") val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter") val clusterManager: ActorRef = context.actorOf(ClusterManager.props(nodeId), "clusterManager") override def receive: Receive = { case GetClusterMembers => clusterManager forward GetMembers case GetFibonacci(value) => processorRouter forward ComputeFibonacci(value) } }

Server and API

Every node of our cluster runs a server able to receive requests. The Server creates our actor system and is configured through the application.conf file.

object Server extends App with NodeRoutes { implicit val system: ActorSystem = ActorSystem("cluster-playground") implicit val materializer: ActorMaterializer = ActorMaterializer() val config: Config = ConfigFactory.load() val address = config.getString("http.ip") val port = config.getInt("http.port") val nodeId = config.getString("clustering.ip") val node: ActorRef = system.actorOf(Node.props(nodeId), "node") lazy val routes: Route = healthRoute ~ statusRoutes ~ processRoutes Http().bindAndHandle(routes, address, port) println(s"Node $nodeId is listening at //$address:$port") Await.result(system.whenTerminated, Duration.Inf) }

Akka HTTP powers the server itself and the REST API, exposing three simple endpoints. These endpoints are defined in the NodeRoutes trait.

The first one is /health, to check the health of a node. It responds with a 200 OK if the node is up and running

lazy val healthRoute: Route = pathPrefix("health") { concat( pathEnd { concat( get { complete(StatusCodes.OK) } ) } ) }

The /status/members endpoint responds with the current active members of the cluster.

lazy val statusRoutes: Route = pathPrefix("status") { concat( pathPrefix("members") { concat( pathEnd { concat( get { val membersFuture: Future[List[String]] = (node ? GetClusterMembers).mapTo[List[String]] onSuccess(membersFuture) { members => complete(StatusCodes.OK, members) } } ) } ) } ) }

The last (but not the least) is the /process/fibonacci/n endpoint, used to request the Fibonacci number of n.

lazy val processRoutes: Route = pathPrefix("process") { concat( pathPrefix("fibonacci") { concat( path(IntNumber) { n => pathEnd { concat( get { val processFuture: Future[ProcessorResponse] = (node ? GetFibonacci(n)).mapTo[ProcessorResponse] onSuccess(processFuture) { response => complete(StatusCodes.OK, response) } } ) } } ) } ) }

It responds with a ProcessorResponse containing the result, along with the id of the node where the computation took place.

Cluster Configuration

Once we have all our actors, we need to configure the system to run as a cluster! The application.conf file is where the magic takes place. I’m going to split it in pieces to present it better, but you can find the complete file here.

Let’s start defining some useful variables.

clustering { ip = "127.0.0.1" ip = ${?CLUSTER_IP} port = 2552 port = ${?CLUSTER_PORT} seed-ip = "127.0.0.1" seed-ip = ${?CLUSTER_SEED_IP} seed-port = 2552 seed-port = ${?CLUSTER_SEED_PORT} cluster.name = "cluster-playground" }

Here we are simply defining the ip and port of the nodes and the seed, as well as the cluster name. We set a default value, then we override it if a new one is specified. The configuration of the cluster is the following.

akka { actor { provider = "cluster" ... /* router configuration */ ... } remote { log-remote-lifecycle-events = on netty.tcp { hostname = ${clustering.ip} port = ${clustering.port} } } cluster { seed-nodes = [ "akka.tcp://"${clustering.cluster.name}"@"${clustering.seed-ip}":"${clustering.seed-port} ] auto-down-unreachable-after = 10s } } ... /* server vars */ ... /* cluster vars */ }

Akka Cluster is build on top of Akka Remoting, so we need to configure it properly. First of all, we specify that we are going to use Akka Cluster saying that provider = "cluster". Then we bind cluster.ip and cluster.port to the hostname and port of the netty web framework.

The cluster requires some seed nodes as its entry points. We set them in the seed-nodes array, in the format akka.tcp://"{clustering.cluster.name}"@"{clustering.seed-ip}":”${clustering.seed-port}”. Right now we have one seed node, but we may add more later.

The auto-down-unreachable-after property sets a member as down after it is unreachable for a period of time. This should be used only during development, as explained in the official documentation.

Ok, the cluster is configured, we can move to the next step: Dockerization and deployment!

Dockerization and deployment

To create the Docker container of our node we can use sbt-native-packager. Its installation is easy: add addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.15") to the plugin.sbt file in the project/ folder. This amazing tool has a plugin for the creation of Docker containers. it allows us to configure the properties of our Dockerfile in the build.sbt file.

// other build.sbt properties enablePlugins(JavaAppPackaging) enablePlugins(DockerPlugin) enablePlugins(AshScriptPlugin) mainClass in Compile := Some("com.elleflorio.cluster.playground.Server") dockerBaseImage := "java:8-jre-alpine" version in Docker := "latest" dockerExposedPorts := Seq(8000) dockerRepository := Some("elleflorio")

Once we have setup the plugin, we can create the docker image running the command sbt docker:publishLocal. Run the command and taste the magic… ?

We have the Docker image of our node, now we need to deploy it and check that everything works fine. The easiest way is to create a docker-compose file that will spawn a seed and a couple of other nodes.

version: '3.5' networks: cluster-network: services: seed: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '2552:2552' - '8000:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: seed CLUSTER_SEED_IP: seed node1: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '8001:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: node1 CLUSTER_PORT: 1600 CLUSTER_SEED_IP: seed CLUSTER_SEED_PORT: 2552 node2: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '8002:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: node2 CLUSTER_PORT: 1600 CLUSTER_SEED_IP: seed CLUSTER_SEED_PORT: 2552

I won’t spend time going through it, since it is quite simple.

Let’s run it!

Time to test our work! Once we run the docker-compose up command, we will have a cluster of three nodes up and running. The seed will respond to requests at port :8000, while node1 and node2 at port :8001 and :8002. Play a bit with the various endpoints. You will see that the requests for a Fibonacci number will be computed by a different node each time, following a round-robin policy. That’s good, we are proud of our work and can get out for a beer to celebrate! ?

Conclusion

We are done here! We learned a lot of things in these ten minutes:

  • What Akka Cluster is and what can do for us.
  • How to create a distributed application with it.
  • How to configure a Group Router for load-balancing in the cluster.
  • How to Dockerize everything and deploy it using docker-compose.

You can find the complete application in my GitHub repo. Feel free to contribute or play with it as you like! ?

See you! ?