1.什么是akka?
AKKA 是一个用于构建高并发、分布式和容错应用程序的开源框架。它基于Actor模型,提供了强大的并发抽象和工具,适用于各种业务场景。以下是一些使用AKKA框架的常见业务场景的示例:
实时数据处理:AKKA提供了轻量级的Actor模型,可以用于处理实时数据流。您可以创建多个Actor来处理数据的不同部分,并使用消息传递机制进行通信和协调。这在实时监控、实时分析和实时推送等场景中非常有用。
并发任务执行:AKKA的Actor模型使得并发任务的执行变得简单。您可以将任务分解为多个独立的Actor,并让它们并行地执行。每个Actor可以负责处理一部分任务,并通过消息传递进行协调和结果汇总。这在批处理、并行计算和任务调度等场景中非常有用。
分布式系统:AKKA提供了分布式Actor模型,可以在多个节点上分布Actor的实例。这使得构建分布式系统变得更加容易。您可以使用AKKA的远程Actor和集群功能来实现分布式的任务分发、数据共享和容错机制。
微服务架构:AKKA可以作为构建微服务架构的基础。每个微服务可以由一个或多个Actor组成,并使用消息传递进行通信。AKKA的容错机制和监督策略可以帮助实现高可用性和容错性的微服务。
实时通信和聊天应用:AKKA提供了高效的消息传递机制,适用于实时通信和聊天应用。每个用户可以由一个Actor表示,消息可以通过Actor之间的邮箱进行传递。这使得实现实时聊天、通知和协作功能变得更加简单。
Actor模型简介
Actor由状态(state)、行为(Behavior)和邮箱(mailBox)三部分组成
状态:Actor中的状态指的是Actor对象的变量信息,状态由Actor自己管理,避免了并发环境下的锁和内存原子性等问题
行为:行为指定的是Actor中计算逻辑,通过Actor接收到消息来改变Actor的状态
邮箱:邮箱是Actor和Actor之间的通信桥梁,邮箱内部通过FIFO消息队列来存储发送方Actor消息,接受方Actor从邮箱队列中获取消息
Actor 模型及其说明
Akka 处理并发的方法基于 Actor 模型。(示意图)
在基于 Actor 的系统里,所有的事物都是 Actor,就好像在面向对象设计里面所有的事物都是 对象一样。
Actor 模型是作为一个并发模型设计和架构的。Actor 与 Actor 之间只能通过消息通信,如图 的信封
Actor 与 Actor 之间只能用消息进行通信,当一个 Actor 给另外一个 Actor 发消息,消息是有 顺序的 (消息队列),只需要将消息投寄的相应的邮箱即可。
怎么处理消息是由接收消息的 Actor 决定的,发送消息 Actor 可以等待回复,也可以异步处理 【ajax】
ActorSystem 的职责是负责创建并管理其创建的 Actor, ActorSystem 是单例的 (可以 ActorSystem 是一个工厂,专门创建 Actor),一个 JVM 进程中有一个即可,而 Acotr 是可以有多个的。
Actor 模型是对并发模型进行了更高的抽象。
Actor 模型是异步、非阻塞、高性能的事件驱动编程模型。[案例:说明 什么是异步、非阻塞,最 经典的案例就是 ajax 异步请求处理]
Actor 模型是轻量级事件处理 (1GB 内存可容纳百万级别个 Actor),因此处理大并发性能高.
2.代码工程
实验目的
基于AKKA actor模型编程
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>springboot-demo</artifactId> <groupId>com.et</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>akka</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.13</artifactId> <version>2.6.0</version> </dependency> <!-- Akka Streams --> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_2.13</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies> </project>
actor
传递string和int参数
package com.et.akka.actor; import akka.actor.AbstractActor; public class ActorNormal extends AbstractActor { //process msg @Override public Receive createReceive() { //Process a specific type of message, such as a string type message Receive build = receiveBuilder().match(String.class,(msg)-> { System.out.println(msg); sender().tell("response", self()); }).match(Integer.class,(msg)-> { System.out.println(msg+"1"); }).build(); return build; } }
传递对象参数
package com.et.akka.actor; import akka.actor.AbstractActor; import com.et.akka.model.User; public class ActorStruct extends AbstractActor { private final User user; public ActorStruct(User userModel){ this.user = userModel; } //process msg @Override public Receive createReceive() { Receive build = receiveBuilder().match(String.class,(msg)-> { System.out.println(msg); sender().tell(" I am a result of ActorStruct:"+user.getName(), self()); }).match(Integer.class,(msg)-> { System.out.println(msg+"1"); }).build(); return build; } }
controller
package com.et.akka.controller; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.pattern.Patterns; import akka.util.Timeout; import com.et.akka.actor.ActorNormal; import com.et.akka.actor.ActorStruct; import com.et.akka.model.User; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import java.util.concurrent.TimeUnit; @RestController public class AkkaController { @GetMapping(value = "/Akka/AkkaSendString") @ResponseBody public void AkkaSendString() { //Creates system management objects for all management actors ActorSystem actorSystem = ActorSystem.create(); //use actorSystem.actorOf to define actorNormal as ActorRef ActorRef actor = actorSystem.actorOf(Props.create(ActorNormal.class), "actorNormal"); //Send message Object msg (the content of the message, any type of data), final ActorRef sender (indicates that there is no sender (actually an Actor called deadLetters)) actor.tell("kiba", ActorRef.noSender()); } @GetMapping(value = "/Akka/AkkaSendInt") @ResponseBody public void AkkaSendInt() { ActorSystem actorSystem = ActorSystem.create(); ActorRef actor = actorSystem.actorOf(Props.create(ActorNormal.class), "actorNormal"); actor.tell(518, ActorRef.noSender());//send int } @GetMapping(value = "/Akka/AkkaAsk") @ResponseBody public void AkkaAsk() { ActorSystem actorSystem = ActorSystem.create(); ActorRef actor = actorSystem.actorOf(Props.create(ActorNormal.class), "actorNormal"); Timeout timeout = new Timeout(Duration.create(2, TimeUnit.SECONDS)); Future<Object> future = Patterns.ask(actor, "hello", timeout); try { Object obj = Await.result(future, timeout.duration()); String reply = obj.toString(); System.out.println("reply msg: " + reply); } catch (Exception e) { e.printStackTrace(); } } @GetMapping(value = "/Akka/AkkaAskStruct") @ResponseBody public void AkkaAskStruct() { ActorSystem actorSystem = ActorSystem.create(); ActorRef actor = actorSystem.actorOf(Props.create(ActorStruct.class,new User(1,"kiba")), "actorNormal"); Timeout timeout = new Timeout(Duration.create(2, TimeUnit.SECONDS)); Future<Object> future = Patterns.ask(actor, "hello", timeout); try { Object obj = Await.result(future, timeout.duration()); String reply = obj.toString(); System.out.println("reply msg: " + reply); } catch (Exception e) { e.printStackTrace(); } } }
model
package com.et.akka.model; import lombok.AllArgsConstructor; import lombok.Data; /** * @author liuhaihua * @version 1.0 * @ClassName User * @Description todo * @date 2024/09/11/ 9:45 */ @Data @AllArgsConstructor public class User { private int age; private String name; }
以上只是一些关键代码,所有代码请参见下面代码仓库
代码仓库
https://github.com/Harries/springboot-demo(akka)
3.测试
启动Spring Boot应用
访问http://127.0.0.1:8088/Akka/AkkaSendString
访问http://127.0.0.1:8088/Akka/AkkaSendInt
访问http://127.0.0.1:8088/Akka/AkkaAsk
访问http://127.0.0.1:8088/Akka/AkkaAskStruct
还没有评论,来说两句吧...