From d0cb75bf4ac5bb33a7160beea87abd1b3f46bd0f Mon Sep 17 00:00:00 2001 From: sun Date: Wed, 20 Sep 2023 16:50:52 +0800 Subject: [PATCH] feat[virtual thread]: support virtual thread in net --- .../com/zfoo/net/anno/PacketReceiver.java | 1 + net/src/main/java/com/zfoo/net/anno/Task.java | 27 ++++ .../com/zfoo/net/packet/PacketService.java | 3 +- .../java/com/zfoo/net/router/IRouter.java | 5 +- .../main/java/com/zfoo/net/router/Router.java | 150 ++++++++++++++++-- .../net/router/receiver/EnhanceUtils.java | 9 ++ .../net/router/receiver/IPacketReceiver.java | 3 + .../receiver/PacketReceiverDefinition.java | 22 ++- .../com/zfoo/net/router/route/PacketBus.java | 126 --------------- .../com/zfoo/net/task/PacketReceiverTask.java | 2 +- .../main/java/com/zfoo/net/task/TaskBus.java | 31 ---- 11 files changed, 203 insertions(+), 176 deletions(-) create mode 100644 net/src/main/java/com/zfoo/net/anno/Task.java delete mode 100644 net/src/main/java/com/zfoo/net/router/route/PacketBus.java diff --git a/net/src/main/java/com/zfoo/net/anno/PacketReceiver.java b/net/src/main/java/com/zfoo/net/anno/PacketReceiver.java index 73c61449..bfa33174 100644 --- a/net/src/main/java/com/zfoo/net/anno/PacketReceiver.java +++ b/net/src/main/java/com/zfoo/net/anno/PacketReceiver.java @@ -25,4 +25,5 @@ import java.lang.annotation.*; @Target({ElementType.METHOD}) @Reflective public @interface PacketReceiver { + Task value() default Task.TaskBus; } diff --git a/net/src/main/java/com/zfoo/net/anno/Task.java b/net/src/main/java/com/zfoo/net/anno/Task.java new file mode 100644 index 00000000..d8781b61 --- /dev/null +++ b/net/src/main/java/com/zfoo/net/anno/Task.java @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2020 The zfoo Authors + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ +package com.zfoo.net.anno; + +/** + * task thread type + * + * @author godotg + */ +public enum Task { + + TaskBus, + + NettyIO, + + VirtualThread; + +} diff --git a/net/src/main/java/com/zfoo/net/packet/PacketService.java b/net/src/main/java/com/zfoo/net/packet/PacketService.java index e00b43ce..3d240050 100644 --- a/net/src/main/java/com/zfoo/net/packet/PacketService.java +++ b/net/src/main/java/com/zfoo/net/packet/PacketService.java @@ -14,7 +14,6 @@ package com.zfoo.net.packet; import com.zfoo.net.NetContext; import com.zfoo.net.router.attachment.SignalAttachment; -import com.zfoo.net.router.route.PacketBus; import com.zfoo.protocol.ProtocolManager; import com.zfoo.protocol.buffer.ByteBufUtils; import com.zfoo.protocol.collection.CollectionUtils; @@ -121,7 +120,7 @@ public class PacketService implements IPacketService { // 注册协议接收器 var componentBeans = applicationContext.getBeansWithAnnotation(Component.class); for (var bean : componentBeans.values()) { - PacketBus.registerPacketReceiverDefinition(bean); + NetContext.getRouter().registerPacketReceiverDefinition(bean); } } diff --git a/net/src/main/java/com/zfoo/net/router/IRouter.java b/net/src/main/java/com/zfoo/net/router/IRouter.java index c299e9c1..e57f72ca 100644 --- a/net/src/main/java/com/zfoo/net/router/IRouter.java +++ b/net/src/main/java/com/zfoo/net/router/IRouter.java @@ -16,6 +16,7 @@ package com.zfoo.net.router; import com.zfoo.net.router.answer.AsyncAnswer; import com.zfoo.net.router.answer.SyncAnswer; import com.zfoo.net.session.Session; +import com.zfoo.net.task.PacketReceiverTask; import org.springframework.lang.Nullable; /** @@ -33,7 +34,9 @@ public interface IRouter { void receive(Session session, Object packet, @Nullable Object attachment); - void atReceiver(Session session, Object packet, @Nullable Object attachment); + void atReceiver(PacketReceiverTask packetReceiverTask); + + void registerPacketReceiverDefinition(Object bean); /** * attention:syncAsk和asyncAsk只能客户端调用 diff --git a/net/src/main/java/com/zfoo/net/router/Router.java b/net/src/main/java/com/zfoo/net/router/Router.java index 40e19924..ca417882 100644 --- a/net/src/main/java/com/zfoo/net/router/Router.java +++ b/net/src/main/java/com/zfoo/net/router/Router.java @@ -15,34 +15,43 @@ package com.zfoo.net.router; import com.zfoo.event.manager.EventBus; import com.zfoo.net.NetContext; +import com.zfoo.net.anno.PacketReceiver; import com.zfoo.net.core.event.ServerExceptionEvent; import com.zfoo.net.core.gateway.model.AuthUidToGatewayCheck; import com.zfoo.net.core.gateway.model.AuthUidToGatewayConfirm; import com.zfoo.net.core.gateway.model.AuthUidToGatewayEvent; import com.zfoo.net.packet.EncodedPacketInfo; +import com.zfoo.net.packet.PacketService; import com.zfoo.net.packet.common.Error; import com.zfoo.net.packet.common.Heartbeat; import com.zfoo.net.router.answer.AsyncAnswer; import com.zfoo.net.router.answer.SyncAnswer; +import com.zfoo.net.router.attachment.AttachmentType; import com.zfoo.net.router.attachment.GatewayAttachment; import com.zfoo.net.router.attachment.HttpAttachment; import com.zfoo.net.router.attachment.SignalAttachment; import com.zfoo.net.router.exception.ErrorResponseException; import com.zfoo.net.router.exception.NetTimeOutException; import com.zfoo.net.router.exception.UnexpectedProtocolException; -import com.zfoo.net.router.route.PacketBus; +import com.zfoo.net.router.receiver.EnhanceUtils; +import com.zfoo.net.router.receiver.IPacketReceiver; +import com.zfoo.net.router.receiver.PacketReceiverDefinition; import com.zfoo.net.router.route.SignalBridge; import com.zfoo.net.session.Session; import com.zfoo.net.task.PacketReceiverTask; import com.zfoo.net.task.TaskBus; +import com.zfoo.protocol.ProtocolManager; +import com.zfoo.protocol.collection.ArrayUtils; import com.zfoo.protocol.exception.ExceptionUtils; -import com.zfoo.protocol.util.JsonUtils; -import com.zfoo.protocol.util.StringUtils; +import com.zfoo.protocol.exception.RunException; +import com.zfoo.protocol.util.*; +import io.netty.util.collection.ShortObjectHashMap; import io.netty.util.concurrent.FastThreadLocal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.lang.Nullable; +import java.lang.reflect.Modifier; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -57,6 +66,8 @@ public class Router implements IRouter { public static final long DEFAULT_TIMEOUT = 3000; + private final ShortObjectHashMap receiverMap = new ShortObjectHashMap<>(); + /** * 作为服务器接收方,会把receive收到的attachment存储在这个地方,只针对task线程。 * atReceiver会设置attachment,但是在方法调用完成会取消,不需要过多关注。 @@ -78,7 +89,7 @@ public class Router implements IRouter { if (attachment == null) { // 正常发送消息的接收,把客户端的业务请求包装下到路由策略指定的线程进行业务处理 // 注意:像客户端以asyncAsk发送请求,在服务器处理完后返回结果,在请求方也是进入这个receive方法,但是attachment不为空,会提前return掉不会走到这 - TaskBus.dispatchBySession(task); + dispatchBySession(task); return; } @@ -88,13 +99,13 @@ public class Router implements IRouter { if (signalAttachment.getClient() == SignalAttachment.SIGNAL_OUTSIDE_CLIENT) { // 服务器收到外部客户端的SIGNAL_OUTSIDE_CLIENT,不做任何处理 - TaskBus.dispatchBySession(task); + dispatchBySession(task); } else if (signalAttachment.getClient() == SignalAttachment.SIGNAL_NATIVE_ARGUMENT_CLIENT) { signalAttachment.setClient(SignalAttachment.SIGNAL_SERVER); - TaskBus.dispatchByTaskExecutorHash(signalAttachment.getTaskExecutorHash(), task); + dispatchByTaskExecutorHash(signalAttachment.getTaskExecutorHash(), task); } else if (signalAttachment.getClient() == SignalAttachment.SIGNAL_NATIVE_NO_ARGUMENT_CLIENT) { signalAttachment.setClient(SignalAttachment.SIGNAL_SERVER); - TaskBus.dispatchBySession(task); + dispatchBySession(task); } else { // 客户端收到服务器应答,客户端发送的时候client为SIGNAL_NATIVE_CLIENT,服务器收到的时候将其设置为SIGNAL_SERVER var removedAttachment = (SignalAttachment) SignalBridge.removeSignalAttachment(signalAttachment); @@ -116,7 +127,7 @@ public class Router implements IRouter { // 注意:此时并没有return,这样子网关的消息才能发给home,在home进行处理LogoutRequest消息的处理 if (gatewayAttachment.isClient()) { gatewayAttachment.setClient(false); - TaskBus.dispatchByTaskExecutorHash(gatewayAttachment.taskExecutorHash(), task); + dispatchByTaskExecutorHash(gatewayAttachment.taskExecutorHash(), task); } else { // 这里是:别的服务提供者提供授权给网关,比如:在玩家登录后,home服查到了玩家uid,然后发给Gateway服 var gatewaySession = NetContext.getSessionManager().getServerSession(gatewayAttachment.getSid()); @@ -146,11 +157,55 @@ public class Router implements IRouter { if (attachment.getClass() == HttpAttachment.class) { var httpAttachment = (HttpAttachment) attachment; - TaskBus.dispatchByTaskExecutorHash(httpAttachment.getTaskExecutorHash(), task); + dispatchByTaskExecutorHash(httpAttachment.getTaskExecutorHash(), task); return; } - TaskBus.dispatchBySession(task); + dispatchBySession(task); + } + + /** + * Actor模型,最主要的就是线程模型,Actor模型保证了某个Actor所代表的任务永远不会同时在两条线程同时处理任务,这就避免了并发。 + * 无论是Java,Kotlin,Scala都没有真正的协程,所以最终做到Actor模型的只能是细致的控制线程。 + *

+ * zfoo中通过对线程池的细粒度控制,从而实现了Actor模型。 + * 为了简单,可以把Actor可以理解为一个用户或者一个玩家。 + * 因为同一个用户或者玩家的uid是固定的,通过uid去计算一致性hash(taskExecutorHash)永远会得到一致的结果, + * 从而保证同一个用户或者玩家的请求总能通过taskExecutorHash被路由到同一台服务器的同一个线程去执行,从而避免了并发,实现了无锁化。 + *

+ * zfoo所代表的Actor模型,是更加精简的Actor模型,让上层调用无感知,在zfoo中可以简单的理解 actor = taskExecutorHash。 + *

+ * 在zfoo这套线程模型中,保证了服务器所接收到的Packet(最终被包装成PacketReceiverTask任务),永远只会在同一条线程处理, + * TaskBus通过AbstractTaskDispatch去派发PacketReceiverTask任务,具体在哪个线程处理通过IAttachment的taskExecutorHash计算。 + *

+ * IAttachment的不同,taskExecutorHash也不同: + * GatewayAttachment:默认是taskExecutorHash等于用户活玩家的uid,也可以通过IGatewayLoadBalancer接口指定 + * SignalAttachment:taskExecutorHash通过IRouter和IConsumer的argument参数指定 + */ + public void dispatchBySession(PacketReceiverTask task) { + var session = task.getSession(); + var uid = session.getUid(); + if (uid > 0) { + dispatchByTaskExecutorHash((int) uid, task); + } else { + dispatchByTaskExecutorHash((int) session.getSid(), task); + } + } + + public void dispatchByTaskExecutorHash(int taskExecutorHash, PacketReceiverTask packetReceiverTask) { + var packet = packetReceiverTask.getPacket(); + var clazz = packet.getClass(); + var receiver = receiverMap.get(ProtocolManager.protocolId(clazz)); + if (receiver == null) { + var name = packet.getClass().getSimpleName(); + throw new RuntimeException(StringUtils.format("no any packetReceiver:[at{}] found for this packet:[{}] or no GatewayAttachment sent back if this server is gateway", name, name)); + } + + switch (receiver.task()) { + case TaskBus -> TaskBus.execute(taskExecutorHash, packetReceiverTask); + case NettyIO -> atReceiver(packetReceiverTask); + case VirtualThread -> Thread.ofVirtual().name("virtual-at" + clazz.getSimpleName()).start(() -> atReceiver(packetReceiverTask)); + } } @Override @@ -312,16 +367,19 @@ public class Router implements IRouter { * 接收者同时只能处理一个session的一个包,同一个发送者发送过来的包排队处理 */ @Override - public void atReceiver(Session session, Object packet, Object attachment) { + public void atReceiver(PacketReceiverTask packetReceiverTask) { + var session = packetReceiverTask.getSession(); + var packet = packetReceiverTask.getPacket(); + var attachment = packetReceiverTask.getAttachment(); try { // 接收者(服务器)同步和异步消息的接收 if (attachment != null) { serverReceiverAttachmentThreadLocal.set(attachment); } - // 调用PacketReceiver,进行真正的业务处理,这个submit只是根据packet找到protocolId,然后调用对应的消息处理方法 - // 这个在哪个线程处理取决于:这个上层的PacketReceiverTask被丢到了哪个线程中 - PacketBus.route(session, packet, attachment); + // The routing of the message + var receiver = receiverMap.get(ProtocolManager.protocolId(packet.getClass())); + receiver.invoke(session, packet, attachment); } catch (Exception e) { EventBus.post(ServerExceptionEvent.valueOf(session, packet, attachment, e)); logger.error(StringUtils.format("e[uid:{}][sid:{}] unknown exception", session.getUid(), session.getSid(), e.getMessage()), e); @@ -335,4 +393,68 @@ public class Router implements IRouter { } } + + @Override + public void registerPacketReceiverDefinition(Object bean) { + var clazz = bean.getClass(); + + var methods = ReflectionUtils.getMethodsByAnnoInPOJOClass(clazz, PacketReceiver.class); + if (ArrayUtils.isEmpty(methods)) { + return; + } + + if (!ReflectionUtils.isPojoClass(clazz)) { + logger.warn("The message registration class [{}] is not a POJO class, and the parent class will not be scanned", clazz); + } + + for (var method : methods) { + var paramClazzs = method.getParameterTypes(); + + AssertionUtils.isTrue(paramClazzs.length == 2 || paramClazzs.length == 3, "[class:{}] [method:{}] must have two or three parameter!", bean.getClass().getName(), method.getName()); + + AssertionUtils.isTrue(Session.class.isAssignableFrom(paramClazzs[0]), "[class:{}] [method:{}],the first parameter must be Session type parameter Exception.", bean.getClass().getName(), method.getName()); + + AssertionUtils.isTrue(paramClazzs.length != 3 || AttachmentType.isAttachmentClass(paramClazzs[2]), "[class:{}] [method:{}],the third parameter must be Attachment type parameter Exception.", bean.getClass().getName(), method.getName()); + + var packetClazz = paramClazzs[1]; + var attachmentClazz = paramClazzs.length == 3 ? paramClazzs[2] : null; + var packetName = packetClazz.getCanonicalName(); + var methodName = method.getName(); + + AssertionUtils.isTrue(Modifier.isPublic(method.getModifiers()), "[class:{}] [method:{}] [packet:{}] must use 'public' as modifier!", bean.getClass().getName(), methodName, packetName); + + AssertionUtils.isTrue(!Modifier.isStatic(method.getModifiers()), "[class:{}] [method:{}] [packet:{}] can not use 'static' as modifier!", bean.getClass().getName(), methodName, packetName); + + var expectedMethodName = StringUtils.format("at{}", packetClazz.getSimpleName()); + AssertionUtils.isTrue(methodName.equals(expectedMethodName), "[class:{}] [method:{}] [packet:{}] expects '{}' as method name!", bean.getClass().getName(), methodName, packetName, expectedMethodName); + + // These rules are not necessary, but can reduce us from making low-level mistakes + // If the request class name ends with Request which is for outer net client, then the attachment can not be a SignalAttachment + // If the request class name ends with Ask which is for intranet client, then attachment can not be a GatewayAttachment + if (attachmentClazz != null && packetName.endsWith(PacketService.NET_ASK_SUFFIX)) { + AssertionUtils.isTrue(!attachmentClazz.equals(GatewayAttachment.class), "[class:{}] [method:{}] [packet:{}] can not match with [attachment:{}]!", bean.getClass().getName(), methodName, packetName, GatewayAttachment.class.getCanonicalName()); + } + + var protocolId = Short.MIN_VALUE; + try { + protocolId = ProtocolManager.protocolId(packetClazz); + } catch (Exception e) { + throw new RunException("[class:{}][protocolId:{}] has no registration, please register for this protocol", packetClazz.getSimpleName(), protocolId); + } + + try { + AssertionUtils.isNull(receiverMap.get(protocolId), "duplicate protocol registration, @PacketReceiver [class:{}] is repeatedly received [at{}]", packetClazz.getSimpleName(), packetClazz.getSimpleName()); + var task = method.getDeclaredAnnotation(PacketReceiver.class).value(); + var receiverDefinition = new PacketReceiverDefinition(bean, method, task, packetClazz, attachmentClazz); + if (GraalVmUtils.isGraalVM()) { + receiverMap.put(protocolId, receiverDefinition); + } else { + var enhanceReceiverDefinition = EnhanceUtils.createPacketReceiver(receiverDefinition); + receiverMap.put(protocolId, enhanceReceiverDefinition); + } + } catch (Throwable t) { + throw new RunException("Registration protocol [class:{}] unknown exception", packetClazz.getSimpleName(), t); + } + } + } } diff --git a/net/src/main/java/com/zfoo/net/router/receiver/EnhanceUtils.java b/net/src/main/java/com/zfoo/net/router/receiver/EnhanceUtils.java index 911e49dc..1849119f 100644 --- a/net/src/main/java/com/zfoo/net/router/receiver/EnhanceUtils.java +++ b/net/src/main/java/com/zfoo/net/router/receiver/EnhanceUtils.java @@ -13,6 +13,8 @@ package com.zfoo.net.router.receiver; +import com.zfoo.event.anno.Bus; +import com.zfoo.net.anno.Task; import com.zfoo.net.session.Session; import com.zfoo.protocol.util.StringUtils; import com.zfoo.protocol.util.UuidUtils; @@ -73,6 +75,13 @@ public abstract class EnhanceUtils { } enhanceClazz.addMethod(invokeMethod); + // 定义类实现的接口方法bus + CtMethod busMethod = new CtMethod(classPool.get(Bus.class.getCanonicalName()), "task", null, enhanceClazz); + busMethod.setModifiers(Modifier.PUBLIC + Modifier.FINAL); + String busMethodBody = StringUtils.format("{ return {}.{}; }", Task.class.getCanonicalName(), definition.getTask()); + busMethod.setBody(busMethodBody); + enhanceClazz.addMethod(busMethod); + enhanceClazz.detach(); var resultClazz = enhanceClazz.toClass(IPacketReceiver.class); diff --git a/net/src/main/java/com/zfoo/net/router/receiver/IPacketReceiver.java b/net/src/main/java/com/zfoo/net/router/receiver/IPacketReceiver.java index 956edbdb..7b2c720d 100644 --- a/net/src/main/java/com/zfoo/net/router/receiver/IPacketReceiver.java +++ b/net/src/main/java/com/zfoo/net/router/receiver/IPacketReceiver.java @@ -13,6 +13,7 @@ package com.zfoo.net.router.receiver; +import com.zfoo.net.anno.Task; import com.zfoo.net.session.Session; /** @@ -20,6 +21,8 @@ import com.zfoo.net.session.Session; */ public interface IPacketReceiver { + Task task(); + void invoke(Session session, Object packet, Object attachment); } diff --git a/net/src/main/java/com/zfoo/net/router/receiver/PacketReceiverDefinition.java b/net/src/main/java/com/zfoo/net/router/receiver/PacketReceiverDefinition.java index 546d924a..287a55fd 100644 --- a/net/src/main/java/com/zfoo/net/router/receiver/PacketReceiverDefinition.java +++ b/net/src/main/java/com/zfoo/net/router/receiver/PacketReceiverDefinition.java @@ -13,6 +13,7 @@ package com.zfoo.net.router.receiver; +import com.zfoo.net.anno.Task; import com.zfoo.net.session.Session; import com.zfoo.protocol.util.ReflectionUtils; @@ -36,6 +37,11 @@ public class PacketReceiverDefinition implements IPacketReceiver { */ private Method method; + /** + * packet receiver type + */ + private Task task; + /** * The protocol class that receives the package, eg: TcpHelloRequest */ @@ -46,14 +52,20 @@ public class PacketReceiverDefinition implements IPacketReceiver { */ private Class attachmentClazz; - public PacketReceiverDefinition(Object bean, Method method, Class packetClazz, Class attachmentClazz) { + public PacketReceiverDefinition(Object bean, Method method, Task task, Class packetClazz, Class attachmentClazz) { this.bean = bean; this.method = method; + this.task = task; this.packetClazz = packetClazz; this.attachmentClazz = attachmentClazz; ReflectionUtils.makeAccessible(method); } + @Override + public Task task() { + return task; + } + @Override public void invoke(Session session, Object packet, Object attachment) { if (attachmentClazz == null) { @@ -79,6 +91,14 @@ public class PacketReceiverDefinition implements IPacketReceiver { this.method = method; } + public Task getTask() { + return task; + } + + public void setTask(Task task) { + this.task = task; + } + public Class getPacketClazz() { return packetClazz; } diff --git a/net/src/main/java/com/zfoo/net/router/route/PacketBus.java b/net/src/main/java/com/zfoo/net/router/route/PacketBus.java deleted file mode 100644 index 71a57a5b..00000000 --- a/net/src/main/java/com/zfoo/net/router/route/PacketBus.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright (C) 2020 The zfoo Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - */ - -package com.zfoo.net.router.route; - -import com.zfoo.net.anno.PacketReceiver; -import com.zfoo.net.packet.PacketService; -import com.zfoo.net.router.attachment.AttachmentType; -import com.zfoo.net.router.attachment.GatewayAttachment; -import com.zfoo.net.router.attachment.SignalAttachment; -import com.zfoo.net.router.receiver.EnhanceUtils; -import com.zfoo.net.router.receiver.IPacketReceiver; -import com.zfoo.net.router.receiver.PacketReceiverDefinition; -import com.zfoo.net.session.Session; -import com.zfoo.protocol.ProtocolManager; -import com.zfoo.protocol.collection.ArrayUtils; -import com.zfoo.protocol.exception.RunException; -import com.zfoo.protocol.util.AssertionUtils; -import com.zfoo.protocol.util.GraalVmUtils; -import com.zfoo.protocol.util.ReflectionUtils; -import com.zfoo.protocol.util.StringUtils; -import io.netty.util.collection.ShortObjectHashMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.Modifier; - -/** - * EN:The receiving route of the packet, the server/client receives the packet and then call corresponding to the Receiver - * CN:包的接收路线,服务器收到packet然后调用对应的Receiver - * - * @author godotg - */ -public abstract class PacketBus { - - private static final Logger logger = LoggerFactory.getLogger(PacketBus.class); - - private static final ShortObjectHashMap receiverMap = new ShortObjectHashMap<>(); - - /** - * The routing of the message - */ - public static void route(Session session, Object packet, Object attachment) { - var receiver = receiverMap.get(ProtocolManager.protocolId(packet.getClass())); - if (receiver == null) { - var name = packet.getClass().getSimpleName(); - throw new RuntimeException(StringUtils.format("no any packetReceiver:[at{}] found for this packet:[{}] or no GatewayAttachment sent back if this server is gateway", name, name)); - } - receiver.invoke(session, packet, attachment); - } - - - public static void registerPacketReceiverDefinition(Object bean) { - var clazz = bean.getClass(); - - var methods = ReflectionUtils.getMethodsByAnnoInPOJOClass(clazz, PacketReceiver.class); - if (ArrayUtils.isEmpty(methods)) { - return; - } - - if (!ReflectionUtils.isPojoClass(clazz)) { - logger.warn("The message registration class [{}] is not a POJO class, and the parent class will not be scanned", clazz); - } - - for (var method : methods) { - var paramClazzs = method.getParameterTypes(); - - AssertionUtils.isTrue(paramClazzs.length == 2 || paramClazzs.length == 3, "[class:{}] [method:{}] must have two or three parameter!", bean.getClass().getName(), method.getName()); - - AssertionUtils.isTrue(Session.class.isAssignableFrom(paramClazzs[0]), "[class:{}] [method:{}],the first parameter must be Session type parameter Exception.", bean.getClass().getName(), method.getName()); - - AssertionUtils.isTrue(paramClazzs.length != 3 || AttachmentType.isAttachmentClass(paramClazzs[2]), "[class:{}] [method:{}],the third parameter must be Attachment type parameter Exception.", bean.getClass().getName(), method.getName()); - - var packetClazz = paramClazzs[1]; - var attachmentClazz = paramClazzs.length == 3 ? paramClazzs[2] : null; - var packetName = packetClazz.getCanonicalName(); - var methodName = method.getName(); - - AssertionUtils.isTrue(Modifier.isPublic(method.getModifiers()), "[class:{}] [method:{}] [packet:{}] must use 'public' as modifier!", bean.getClass().getName(), methodName, packetName); - - AssertionUtils.isTrue(!Modifier.isStatic(method.getModifiers()), "[class:{}] [method:{}] [packet:{}] can not use 'static' as modifier!", bean.getClass().getName(), methodName, packetName); - - var expectedMethodName = StringUtils.format("at{}", packetClazz.getSimpleName()); - AssertionUtils.isTrue(methodName.equals(expectedMethodName), "[class:{}] [method:{}] [packet:{}] expects '{}' as method name!", bean.getClass().getName(), methodName, packetName, expectedMethodName); - - // These rules are not necessary, but can reduce us from making low-level mistakes - // If the request class name ends with Request which is for outer net client, then the attachment can not be a SignalAttachment - // If the request class name ends with Ask which is for intranet client, then attachment can not be a GatewayAttachment - if (attachmentClazz != null && packetName.endsWith(PacketService.NET_ASK_SUFFIX)) { - AssertionUtils.isTrue(!attachmentClazz.equals(GatewayAttachment.class), "[class:{}] [method:{}] [packet:{}] can not match with [attachment:{}]!", bean.getClass().getName(), methodName, packetName, GatewayAttachment.class.getCanonicalName()); - } - - var protocolId = Short.MIN_VALUE; - try { - protocolId = ProtocolManager.protocolId(packetClazz); - } catch (Exception e) { - throw new RunException("[class:{}][protocolId:{}] has no registration, please register for this protocol", packetClazz.getSimpleName(), protocolId); - } - - try { - AssertionUtils.isNull(receiverMap.get(protocolId), "duplicate protocol registration, @PacketReceiver [class:{}] is repeatedly received [at{}]", packetClazz.getSimpleName(), packetClazz.getSimpleName()); - - var receiverDefinition = new PacketReceiverDefinition(bean, method, packetClazz, attachmentClazz); - if (GraalVmUtils.isGraalVM()) { - receiverMap.put(protocolId, receiverDefinition); - } else { - var enhanceReceiverDefinition = EnhanceUtils.createPacketReceiver(receiverDefinition); - receiverMap.put(protocolId, enhanceReceiverDefinition); - } - } catch (Throwable t) { - throw new RunException("Registration protocol [class:{}] unknown exception", packetClazz.getSimpleName(), t); - } - } - } - -} diff --git a/net/src/main/java/com/zfoo/net/task/PacketReceiverTask.java b/net/src/main/java/com/zfoo/net/task/PacketReceiverTask.java index 54ee6d9b..03c8bafc 100644 --- a/net/src/main/java/com/zfoo/net/task/PacketReceiverTask.java +++ b/net/src/main/java/com/zfoo/net/task/PacketReceiverTask.java @@ -32,7 +32,7 @@ public final class PacketReceiverTask implements Runnable { @Override public void run() { - NetContext.getRouter().atReceiver(session, packet, attachment); + NetContext.getRouter().atReceiver(this); } diff --git a/net/src/main/java/com/zfoo/net/task/TaskBus.java b/net/src/main/java/com/zfoo/net/task/TaskBus.java index 6070346d..89ae60e7 100644 --- a/net/src/main/java/com/zfoo/net/task/TaskBus.java +++ b/net/src/main/java/com/zfoo/net/task/TaskBus.java @@ -92,37 +92,6 @@ public final class TaskBus { } } - /** - * Actor模型,最主要的就是线程模型,Actor模型保证了某个Actor所代表的任务永远不会同时在两条线程同时处理任务,这就避免了并发。 - * 无论是Java,Kotlin,Scala都没有真正的协程,所以最终做到Actor模型的只能是细致的控制线程。 - *

- * zfoo中通过对线程池的细粒度控制,从而实现了Actor模型。 - * 为了简单,可以把Actor可以理解为一个用户或者一个玩家。 - * 因为同一个用户或者玩家的uid是固定的,通过uid去计算一致性hash(taskExecutorHash)永远会得到一致的结果, - * 从而保证同一个用户或者玩家的请求总能通过taskExecutorHash被路由到同一台服务器的同一个线程去执行,从而避免了并发,实现了无锁化。 - *

- * zfoo所代表的Actor模型,是更加精简的Actor模型,让上层调用无感知,在zfoo中可以简单的理解 actor = taskExecutorHash。 - *

- * 在zfoo这套线程模型中,保证了服务器所接收到的Packet(最终被包装成PacketReceiverTask任务),永远只会在同一条线程处理, - * TaskBus通过AbstractTaskDispatch去派发PacketReceiverTask任务,具体在哪个线程处理通过IAttachment的taskExecutorHash计算。 - *

- * IAttachment的不同,taskExecutorHash也不同: - * GatewayAttachment:默认是taskExecutorHash等于用户活玩家的uid,也可以通过IGatewayLoadBalancer接口指定 - * SignalAttachment:taskExecutorHash通过IRouter和IConsumer的argument参数指定 - */ - public static void dispatchBySession(PacketReceiverTask task) { - var session = task.getSession(); - var uid = session.getUid(); - if (uid > 0) { - execute((int) uid, task); - } else { - execute((int) session.getSid(), task); - } - } - - public static void dispatchByTaskExecutorHash(int taskExecutorHash, PacketReceiverTask task) { - execute(taskExecutorHash, task); - } private static int calTaskExecutorIndex(int taskExecutorHash) { // Other hash algorithms can be customized to make the distribution more uniform