feat: add akkabus

This commit is contained in:
2025-02-25 22:44:58 +07:00
parent 732661f3be
commit e2704bcf2d
3 changed files with 61 additions and 0 deletions
+2
View File
@@ -21,12 +21,14 @@ idea {
repositories {
mavenCentral()
maven("https://repo.akka.io/maven")
}
dependencies {
annotationProcessor("org.projectlombok:lombok:1.18.36")
compileOnly("org.projectlombok:lombok:1.18.36")
implementation("com.typesafe.akka:akka-actor_3:2.10.0")
implementation("org.slf4j:slf4j-simple:2.0.16")
testAnnotationProcessor("org.projectlombok:lombok:1.18.36")
@@ -0,0 +1,53 @@
package com.miti99.taskbus.bus.impl;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.routing.ConsistentHashingPool;
import akka.routing.ConsistentHashingRouter.ConsistentHashMapper;
import com.miti99.taskbus.bus.Bus;
import com.miti99.taskbus.task.Task;
import java.io.Closeable;
public class AkkaBus implements Bus, Closeable {
private static final int POOL_SIZE = Runtime.getRuntime().availableProcessors();
private final ActorSystem system = ActorSystem.create("AkkaBus");
private final ActorRef router;
public AkkaBus() {
router =
system.actorOf(
new ConsistentHashingPool(POOL_SIZE)
.withHashMapper(new TaskHashMapper())
.props(Props.create(TaskWorker.class)));
}
@Override
public void submit(Task task) {
router.tell(task, ActorRef.noSender());
}
@Override
public void close() {
system.terminate();
}
public static class TaskHashMapper implements ConsistentHashMapper {
@Override
public Object hashKey(Object message) {
if (message instanceof Task task) {
return task.hash() % POOL_SIZE;
} else {
return null;
}
}
}
public static class TaskWorker extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder().match(Task.class, Task::execute).build();
}
}
}
@@ -30,4 +30,10 @@ class BusTest {
Bus bus = new TaskBus();
submitTasks(bus);
}
@Test
void testAkkaBus() {
Bus bus = new AkkaBus();
submitTasks(bus);
}
}