From e2704bcf2d7effd4957168dd54778249a8a62372 Mon Sep 17 00:00:00 2001 From: tiennm99 Date: Tue, 25 Feb 2025 22:44:58 +0700 Subject: [PATCH] feat: add akkabus --- build.gradle.kts | 2 + .../com/miti99/taskbus/bus/impl/AkkaBus.java | 53 +++++++++++++++++++ .../com/miti99/taskbus/bus/impl/BusTest.java | 6 +++ 3 files changed, 61 insertions(+) create mode 100644 src/main/java/com/miti99/taskbus/bus/impl/AkkaBus.java diff --git a/build.gradle.kts b/build.gradle.kts index 0d6c1f0..360af39 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -21,12 +21,14 @@ idea { repositories { mavenCentral() + maven("https://repo.akka.io/maven") } dependencies { annotationProcessor("org.projectlombok:lombok:1.18.36") compileOnly("org.projectlombok:lombok:1.18.36") + implementation("com.typesafe.akka:akka-actor_3:2.10.0") implementation("org.slf4j:slf4j-simple:2.0.16") testAnnotationProcessor("org.projectlombok:lombok:1.18.36") diff --git a/src/main/java/com/miti99/taskbus/bus/impl/AkkaBus.java b/src/main/java/com/miti99/taskbus/bus/impl/AkkaBus.java new file mode 100644 index 0000000..9fc37ff --- /dev/null +++ b/src/main/java/com/miti99/taskbus/bus/impl/AkkaBus.java @@ -0,0 +1,53 @@ +package com.miti99.taskbus.bus.impl; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.routing.ConsistentHashingPool; +import akka.routing.ConsistentHashingRouter.ConsistentHashMapper; +import com.miti99.taskbus.bus.Bus; +import com.miti99.taskbus.task.Task; +import java.io.Closeable; + +public class AkkaBus implements Bus, Closeable { + private static final int POOL_SIZE = Runtime.getRuntime().availableProcessors(); + private final ActorSystem system = ActorSystem.create("AkkaBus"); + private final ActorRef router; + + public AkkaBus() { + router = + system.actorOf( + new ConsistentHashingPool(POOL_SIZE) + .withHashMapper(new TaskHashMapper()) + .props(Props.create(TaskWorker.class))); + } + + @Override + public void submit(Task task) { + router.tell(task, ActorRef.noSender()); + } + + @Override + public void close() { + system.terminate(); + } + + public static class TaskHashMapper implements ConsistentHashMapper { + @Override + public Object hashKey(Object message) { + if (message instanceof Task task) { + return task.hash() % POOL_SIZE; + } else { + return null; + } + } + } + + public static class TaskWorker extends AbstractActor { + @Override + public Receive createReceive() { + return receiveBuilder().match(Task.class, Task::execute).build(); + } + } +} diff --git a/src/test/java/com/miti99/taskbus/bus/impl/BusTest.java b/src/test/java/com/miti99/taskbus/bus/impl/BusTest.java index 8f7dcca..e141afb 100644 --- a/src/test/java/com/miti99/taskbus/bus/impl/BusTest.java +++ b/src/test/java/com/miti99/taskbus/bus/impl/BusTest.java @@ -30,4 +30,10 @@ class BusTest { Bus bus = new TaskBus(); submitTasks(bus); } + + @Test + void testAkkaBus() { + Bus bus = new AkkaBus(); + submitTasks(bus); + } }