Akka 与 Java8 入门教程

Actor

Actor 是akka 中最核心也是最基础的单位, akka 的所有消息都是基于 actor 实现的, 创建一个 actor 便可以完成消息的接收/处理/发送/转发/路由等待功能.

定义一个actor, 先从构造开始

package actors;

import akka.actor.AbstractActor;
import akka.actor.Props;

/**
 * Created by Dong Wang.
 * Created on 2018/9/8 10:56
 */
public class UserLoginActor extends AbstractActor {
//    private Object parameters;

    public static Props props() {
        return Props.create(UserLoginActor.class, UserLoginActor::new);
    }

    private UserLoginActor() {
    }

    // 如果有构造参数, 便按照如下方式构造 Props 即可
//    public static Props props(Object parameters) {
//        return Props.create(UserLoginActor.class, () -> new UserLoginActor(parameters));
//    }
//
//    private UserLoginActor(Object parameters) {
//        this.parameters = parameters;
//    }

    @Override
    public Receive createReceive() {
        return null; //后面再实现逻辑
    }
}

紧接着我们来定义一种消息类型, 由于消息需要在网络传输, 所以都定义为 final 类型的, 如果是分布式系统, 有必要序列化消息

package messages;

/**
 * Created by Dong Wang.
 * Created on 2018/9/8 11:07
 */
public final class User {
    private final long userId;
    private final String userName;
    private final int age;

    public User(long userId, String userName, int age) {
        this.userId = userId;
        this.userName = userName;
        this.age = age;
    }
}

接着我们实现 Actor 接收到消息后的逻辑处理

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(User.class, this::handleUser)
                .build();
    }

    private void handleUser(User user) {
        // 处理逻辑, 比如回复一个消息
        getSender().tell("成功收到用户消息!", ActorRef.noSender());
    }

你可以继续写很多 match 事件, 比如

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(User.class, this::handleUser)
                .match(Other.class, this::handleOthers)
                //....
                .build();
    }

我们定义的 actor 可能收到各种系统发来的消息, 不一定是 User.class 等等, 当没有匹配到的时候, 我们需要定义一个通用的处理逻辑, 相当于 else 的概念matchAny

先定义一个日志属性,可以用 akka 自带, 也可以集成第三方日志框架如 self4j

private LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

使用 matchAny

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(User.class, this::handleUser)
                .matchAny(this::handleElse)
                .build();
    }


    private void handleElse(Object messages) {
        log.info("UserLoginActor 接收到消息:{}", messages);
    }

ActorSystem

接下来就是如何使用这些 actor 啦, 首先我们需要一个最顶级的 actor来创建我们需要的其余 actor.

这里说一下, actor 是有层级的, 最顶级有3个, 其中供我们使用的是 actorSystem, 用它来创建我们需要的 actors, 然后用某个 actor 继续创建子 actor 即可. 所以理论上来actorSystem只有一个全局的即可

import actors.UserLoginActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import messages.User;

/**
 * Created by Dong Wang.
 * Created on 2018/9/8 11:28
 */
public class Main {
    public static void main(String[] args) throws InterruptedException {
        ActorSystem actorSystem = ActorSystem.create("actorSystemName");

        ActorRef userLoginActor = actorSystem.actorOf(UserLoginActor.props(), "userLoginActorName");
        userLoginActor.tell(new User(1, "Dong", 30), ActorRef.noSender());
        userLoginActor.tell("哇", ActorRef.noSender());

//        actorSystem.terminate(); // 这个方法终止 actor
    }
}

所以, 收到 User 的时候, 我们主 actor 回收到回复, 收到 哇 的时候, 会打印日志

这里你可以使用 ask 来同步等待 actor 回复的消息, 比如

Duration timeout = Duration.ofSeconds(10);
CompletionStage<Object> result = PatternsCS.ask(userLoginActor, new User(1, "Dong", 30), timeout);
result.whenComplete((message, error) -> System.out.println("主方法:" + message));

对 akka 感兴趣的, 多关注我吧!

发表评论

您的电子邮箱地址不会被公开。 必填项已用 * 标注

Scroll to Top