Actor
Actor 是akka 中最核心也是最基础的单位, akka 的所有消息都是基于 actor 实现的, 创建一个 actor 便可以完成消息的接收/处理/发送/转发/路由等待功能.
定义一个actor, 先从构造开始
package actors; import akka.actor.AbstractActor; import akka.actor.Props; /** * Created by Dong Wang. * Created on 2018/9/8 10:56 */ public class UserLoginActor extends AbstractActor { // private Object parameters; public static Props props() { return Props.create(UserLoginActor.class, UserLoginActor::new); } private UserLoginActor() { } // 如果有构造参数, 便按照如下方式构造 Props 即可 // public static Props props(Object parameters) { // return Props.create(UserLoginActor.class, () -> new UserLoginActor(parameters)); // } // // private UserLoginActor(Object parameters) { // this.parameters = parameters; // } @Override public Receive createReceive() { return null; //后面再实现逻辑 } }
紧接着我们来定义一种消息类型, 由于消息需要在网络传输, 所以都定义为 final 类型的, 如果是分布式系统, 有必要序列化消息
package messages; /** * Created by Dong Wang. * Created on 2018/9/8 11:07 */ public final class User { private final long userId; private final String userName; private final int age; public User(long userId, String userName, int age) { this.userId = userId; this.userName = userName; this.age = age; } }
接着我们实现 Actor 接收到消息后的逻辑处理
@Override public Receive createReceive() { return receiveBuilder() .match(User.class, this::handleUser) .build(); } private void handleUser(User user) { // 处理逻辑, 比如回复一个消息 getSender().tell("成功收到用户消息!", ActorRef.noSender()); }
你可以继续写很多 match 事件, 比如
@Override public Receive createReceive() { return receiveBuilder() .match(User.class, this::handleUser) .match(Other.class, this::handleOthers) //.... .build(); }
我们定义的 actor 可能收到各种系统发来的消息, 不一定是 User.class 等等, 当没有匹配到的时候, 我们需要定义一个通用的处理逻辑, 相当于 else 的概念matchAny
先定义一个日志属性,可以用 akka 自带, 也可以集成第三方日志框架如 self4j
private LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
使用 matchAny
@Override public Receive createReceive() { return receiveBuilder() .match(User.class, this::handleUser) .matchAny(this::handleElse) .build(); } private void handleElse(Object messages) { log.info("UserLoginActor 接收到消息:{}", messages); }
ActorSystem
接下来就是如何使用这些 actor 啦, 首先我们需要一个最顶级的 actor来创建我们需要的其余 actor.
这里说一下, actor 是有层级的, 最顶级有3个, 其中供我们使用的是 actorSystem, 用它来创建我们需要的 actors, 然后用某个 actor 继续创建子 actor 即可. 所以理论上来actorSystem只有一个全局的即可
import actors.UserLoginActor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import messages.User; /** * Created by Dong Wang. * Created on 2018/9/8 11:28 */ public class Main { public static void main(String[] args) throws InterruptedException { ActorSystem actorSystem = ActorSystem.create("actorSystemName"); ActorRef userLoginActor = actorSystem.actorOf(UserLoginActor.props(), "userLoginActorName"); userLoginActor.tell(new User(1, "Dong", 30), ActorRef.noSender()); userLoginActor.tell("哇", ActorRef.noSender()); // actorSystem.terminate(); // 这个方法终止 actor } }
所以, 收到 User 的时候, 我们主 actor 回收到回复, 收到 哇 的时候, 会打印日志
这里你可以使用 ask 来同步等待 actor 回复的消息, 比如
Duration timeout = Duration.ofSeconds(10); CompletionStage<Object> result = PatternsCS.ask(userLoginActor, new User(1, "Dong", 30), timeout); result.whenComplete((message, error) -> System.out.println("主方法:" + message));
对 akka 感兴趣的, 多关注我吧!