Akka 集群单例,利用work pulling工作拉取模式,实现定时分发分布式任务

写在最前面

Akka 的开发语言有Java和Scala,本文设计到的编码属于Java。

Akka 的API 有classic 和typed 两种,本文所用到的属于typed API。

maven 提供了一整套依赖加载:

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <scala.version>2.6.17</scala.version>
        <scala.binary.version>2.13</scala.binary.version>
    </properties>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>com.typesafe.akka</groupId>
                <artifactId>akka-bom_${scala.binary.version}</artifactId>
                <version>${scala.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-cluster-typed_${scala.binary.version}</artifactId>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_${scala.binary.version}</artifactId>
        </dependency>
    </dependencies>

需求介绍

最近公司有个任务,在 Akka 的集群中,要实现一个定时任务,这个任务负责在数据库中每隔一段时间读取数据。然后把这些数据分发给负责处理数据的worker actor。

这里这个定时任务肯定也是一个actor,然后为了避免在分布式部署集群的节点中出现多个定时任务竞争资源,我们想到了用 akka 的集群单例来实现它。所谓集群单例(Cluster Singleton)就是只会在整个集群中实例化一个actor,然后它自带灾难重启功能,但是切记用集群单例的时候,不要启用Cluster downing strategy,不然会出现集群脑裂,单例失效。

然后定时任务就用 akka 自带的定时任务即可,很方便。

最后worker 用了pool方式的路由子worker,然后在集群中用工作拉取模式(work pulling)来实现生产者消费者模型即可。

实现代码

 

集群单列实现scheduler

        ClusterSingleton singleton = ClusterSingleton.get(system);
        singleton.init(SingletonActor.of(SchedulerBehavior.create(), "SchedulerBehavior"));

schedulerBehavior:

    public static Behavior<Command> create() {
        return Behaviors.setup(
                context -> {
                    ActorRef<WorkPullingProducerController.RequestNext<NotificationData>>
                            requestNextAdapter =
                            context.messageAdapter(WorkPullingProducerController.requestNextClass(), WrappedRequestNext::new);
                    ActorRef<WorkPullingProducerController.Command<NotificationData>>
                            producerController =
                            context.spawn(
                                    WorkPullingProducerController.create(
                                            NotificationData.class,
                                            "scheduleManager",
                                            workerServiceKey,
                                            Optional.empty()),
                                    "producerController");
                    producerController.tell(new WorkPullingProducerController.Start<>(requestNextAdapter));

                    return Behaviors.withStash(
                            1000, stashBuffer -> new SchedulerBehavior(context, stashBuffer).waitForNext());
                });
    }

workerBehavior

    public static Behavior<Command> create() {
        return Behaviors.setup(
                context -> {
                    PoolRouter<Command> pool =
                            Routers.pool(
                                            3,
                                            Behaviors.supervise(RealWorker.create()).onFailure(SupervisorStrategy.restart()))
                                    .withRoundRobinRouting();
                    ActorRef<Command> router = context.spawn(pool, "worker-pool");

                    ActorRef<ConsumerController.Delivery<NotificationData>> deliveryAdapter =
                            context.messageAdapter(ConsumerController.deliveryClass(), SchedulerMessage::new);
                    ActorRef<ConsumerController.Command<NotificationData>> consumerController =
                            context.spawn(ConsumerController.create(workerServiceKey), "consumerController");
                    consumerController.tell(new ConsumerController.Start<>(deliveryAdapter));

                    return new WorkerBehavior(context, router);
                });
    }

然后就是定时任务,往sceduler发信息即可

  context.getSystem().scheduler().scheduleWithFixedDelay(
                Duration.ZERO,
                Duration.ofSeconds(3),
                () -> {
                    SchedulerMessage schedulerMessage = new SchedulerMessage(
                            new ConsumerController.Delivery<>(
                                    new NotificationData(1L), null, "scheduleManager", 1
                            )
                    );
                    context.getSelf().tell(schedulerMessage);
                },
                ExecutionContext.global());

发表评论

您的电子邮箱地址不会被公开。 必填项已用 * 标注

Scroll to Top