feat[virtual thread]: support virtual thread in net

This commit is contained in:
sun
2023-09-20 16:50:52 +08:00
parent 162e59944e
commit d0cb75bf4a
11 changed files with 203 additions and 176 deletions
@@ -25,4 +25,5 @@ import java.lang.annotation.*;
@Target({ElementType.METHOD})
@Reflective
public @interface PacketReceiver {
Task value() default Task.TaskBus;
}
@@ -0,0 +1,27 @@
/*
* Copyright (C) 2020 The zfoo Authors
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.zfoo.net.anno;
/**
* task thread type
*
* @author godotg
*/
public enum Task {
TaskBus,
NettyIO,
VirtualThread;
}
@@ -14,7 +14,6 @@ package com.zfoo.net.packet;
import com.zfoo.net.NetContext;
import com.zfoo.net.router.attachment.SignalAttachment;
import com.zfoo.net.router.route.PacketBus;
import com.zfoo.protocol.ProtocolManager;
import com.zfoo.protocol.buffer.ByteBufUtils;
import com.zfoo.protocol.collection.CollectionUtils;
@@ -121,7 +120,7 @@ public class PacketService implements IPacketService {
// 注册协议接收器
var componentBeans = applicationContext.getBeansWithAnnotation(Component.class);
for (var bean : componentBeans.values()) {
PacketBus.registerPacketReceiverDefinition(bean);
NetContext.getRouter().registerPacketReceiverDefinition(bean);
}
}
@@ -16,6 +16,7 @@ package com.zfoo.net.router;
import com.zfoo.net.router.answer.AsyncAnswer;
import com.zfoo.net.router.answer.SyncAnswer;
import com.zfoo.net.session.Session;
import com.zfoo.net.task.PacketReceiverTask;
import org.springframework.lang.Nullable;
/**
@@ -33,7 +34,9 @@ public interface IRouter {
void receive(Session session, Object packet, @Nullable Object attachment);
void atReceiver(Session session, Object packet, @Nullable Object attachment);
void atReceiver(PacketReceiverTask packetReceiverTask);
void registerPacketReceiverDefinition(Object bean);
/**
* attentionsyncAsk和asyncAsk只能客户端调用
+136 -14
View File
@@ -15,34 +15,43 @@ package com.zfoo.net.router;
import com.zfoo.event.manager.EventBus;
import com.zfoo.net.NetContext;
import com.zfoo.net.anno.PacketReceiver;
import com.zfoo.net.core.event.ServerExceptionEvent;
import com.zfoo.net.core.gateway.model.AuthUidToGatewayCheck;
import com.zfoo.net.core.gateway.model.AuthUidToGatewayConfirm;
import com.zfoo.net.core.gateway.model.AuthUidToGatewayEvent;
import com.zfoo.net.packet.EncodedPacketInfo;
import com.zfoo.net.packet.PacketService;
import com.zfoo.net.packet.common.Error;
import com.zfoo.net.packet.common.Heartbeat;
import com.zfoo.net.router.answer.AsyncAnswer;
import com.zfoo.net.router.answer.SyncAnswer;
import com.zfoo.net.router.attachment.AttachmentType;
import com.zfoo.net.router.attachment.GatewayAttachment;
import com.zfoo.net.router.attachment.HttpAttachment;
import com.zfoo.net.router.attachment.SignalAttachment;
import com.zfoo.net.router.exception.ErrorResponseException;
import com.zfoo.net.router.exception.NetTimeOutException;
import com.zfoo.net.router.exception.UnexpectedProtocolException;
import com.zfoo.net.router.route.PacketBus;
import com.zfoo.net.router.receiver.EnhanceUtils;
import com.zfoo.net.router.receiver.IPacketReceiver;
import com.zfoo.net.router.receiver.PacketReceiverDefinition;
import com.zfoo.net.router.route.SignalBridge;
import com.zfoo.net.session.Session;
import com.zfoo.net.task.PacketReceiverTask;
import com.zfoo.net.task.TaskBus;
import com.zfoo.protocol.ProtocolManager;
import com.zfoo.protocol.collection.ArrayUtils;
import com.zfoo.protocol.exception.ExceptionUtils;
import com.zfoo.protocol.util.JsonUtils;
import com.zfoo.protocol.util.StringUtils;
import com.zfoo.protocol.exception.RunException;
import com.zfoo.protocol.util.*;
import io.netty.util.collection.ShortObjectHashMap;
import io.netty.util.concurrent.FastThreadLocal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.Nullable;
import java.lang.reflect.Modifier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -57,6 +66,8 @@ public class Router implements IRouter {
public static final long DEFAULT_TIMEOUT = 3000;
private final ShortObjectHashMap<IPacketReceiver> receiverMap = new ShortObjectHashMap<>();
/**
* 作为服务器接收方,会把receive收到的attachment存储在这个地方,只针对task线程。
* atReceiver会设置attachment,但是在方法调用完成会取消,不需要过多关注。
@@ -78,7 +89,7 @@ public class Router implements IRouter {
if (attachment == null) {
// 正常发送消息的接收,把客户端的业务请求包装下到路由策略指定的线程进行业务处理
// 注意:像客户端以asyncAsk发送请求,在服务器处理完后返回结果,在请求方也是进入这个receive方法,但是attachment不为空,会提前return掉不会走到这
TaskBus.dispatchBySession(task);
dispatchBySession(task);
return;
}
@@ -88,13 +99,13 @@ public class Router implements IRouter {
if (signalAttachment.getClient() == SignalAttachment.SIGNAL_OUTSIDE_CLIENT) {
// 服务器收到外部客户端的SIGNAL_OUTSIDE_CLIENT,不做任何处理
TaskBus.dispatchBySession(task);
dispatchBySession(task);
} else if (signalAttachment.getClient() == SignalAttachment.SIGNAL_NATIVE_ARGUMENT_CLIENT) {
signalAttachment.setClient(SignalAttachment.SIGNAL_SERVER);
TaskBus.dispatchByTaskExecutorHash(signalAttachment.getTaskExecutorHash(), task);
dispatchByTaskExecutorHash(signalAttachment.getTaskExecutorHash(), task);
} else if (signalAttachment.getClient() == SignalAttachment.SIGNAL_NATIVE_NO_ARGUMENT_CLIENT) {
signalAttachment.setClient(SignalAttachment.SIGNAL_SERVER);
TaskBus.dispatchBySession(task);
dispatchBySession(task);
} else {
// 客户端收到服务器应答,客户端发送的时候client为SIGNAL_NATIVE_CLIENT,服务器收到的时候将其设置为SIGNAL_SERVER
var removedAttachment = (SignalAttachment) SignalBridge.removeSignalAttachment(signalAttachment);
@@ -116,7 +127,7 @@ public class Router implements IRouter {
// 注意:此时并没有return,这样子网关的消息才能发给home,在home进行处理LogoutRequest消息的处理
if (gatewayAttachment.isClient()) {
gatewayAttachment.setClient(false);
TaskBus.dispatchByTaskExecutorHash(gatewayAttachment.taskExecutorHash(), task);
dispatchByTaskExecutorHash(gatewayAttachment.taskExecutorHash(), task);
} else {
// 这里是:别的服务提供者提供授权给网关,比如:在玩家登录后,home服查到了玩家uid,然后发给Gateway服
var gatewaySession = NetContext.getSessionManager().getServerSession(gatewayAttachment.getSid());
@@ -146,11 +157,55 @@ public class Router implements IRouter {
if (attachment.getClass() == HttpAttachment.class) {
var httpAttachment = (HttpAttachment) attachment;
TaskBus.dispatchByTaskExecutorHash(httpAttachment.getTaskExecutorHash(), task);
dispatchByTaskExecutorHash(httpAttachment.getTaskExecutorHash(), task);
return;
}
TaskBus.dispatchBySession(task);
dispatchBySession(task);
}
/**
* Actor模型,最主要的就是线程模型,Actor模型保证了某个Actor所代表的任务永远不会同时在两条线程同时处理任务,这就避免了并发。
* 无论是JavaKotlin,Scala都没有真正的协程,所以最终做到Actor模型的只能是细致的控制线程。
* <p>
* zfoo中通过对线程池的细粒度控制,从而实现了Actor模型。
* 为了简单,可以把Actor可以理解为一个用户或者一个玩家。
* 因为同一个用户或者玩家的uid是固定的,通过uid去计算一致性hashtaskExecutorHash)永远会得到一致的结果,
* 从而保证同一个用户或者玩家的请求总能通过taskExecutorHash被路由到同一台服务器的同一个线程去执行,从而避免了并发,实现了无锁化。
* <p>
* zfoo所代表的Actor模型,是更加精简的Actor模型,让上层调用无感知,在zfoo中可以简单的理解 actor = taskExecutorHash。
* <p>
* 在zfoo这套线程模型中,保证了服务器所接收到的Packet(最终被包装成PacketReceiverTask任务),永远只会在同一条线程处理,
* TaskBus通过AbstractTaskDispatch去派发PacketReceiverTask任务,具体在哪个线程处理通过IAttachment的taskExecutorHash计算。
* <p>
* IAttachment的不同,taskExecutorHash也不同:
* GatewayAttachment:默认是taskExecutorHash等于用户活玩家的uid,也可以通过IGatewayLoadBalancer接口指定
* SignalAttachmenttaskExecutorHash通过IRouter和IConsumer的argument参数指定
*/
public void dispatchBySession(PacketReceiverTask task) {
var session = task.getSession();
var uid = session.getUid();
if (uid > 0) {
dispatchByTaskExecutorHash((int) uid, task);
} else {
dispatchByTaskExecutorHash((int) session.getSid(), task);
}
}
public void dispatchByTaskExecutorHash(int taskExecutorHash, PacketReceiverTask packetReceiverTask) {
var packet = packetReceiverTask.getPacket();
var clazz = packet.getClass();
var receiver = receiverMap.get(ProtocolManager.protocolId(clazz));
if (receiver == null) {
var name = packet.getClass().getSimpleName();
throw new RuntimeException(StringUtils.format("no any packetReceiver:[at{}] found for this packet:[{}] or no GatewayAttachment sent back if this server is gateway", name, name));
}
switch (receiver.task()) {
case TaskBus -> TaskBus.execute(taskExecutorHash, packetReceiverTask);
case NettyIO -> atReceiver(packetReceiverTask);
case VirtualThread -> Thread.ofVirtual().name("virtual-at" + clazz.getSimpleName()).start(() -> atReceiver(packetReceiverTask));
}
}
@Override
@@ -312,16 +367,19 @@ public class Router implements IRouter {
* 接收者同时只能处理一个session的一个包,同一个发送者发送过来的包排队处理
*/
@Override
public void atReceiver(Session session, Object packet, Object attachment) {
public void atReceiver(PacketReceiverTask packetReceiverTask) {
var session = packetReceiverTask.getSession();
var packet = packetReceiverTask.getPacket();
var attachment = packetReceiverTask.getAttachment();
try {
// 接收者(服务器)同步和异步消息的接收
if (attachment != null) {
serverReceiverAttachmentThreadLocal.set(attachment);
}
// 调用PacketReceiver,进行真正的业务处理,这个submit只是根据packet找到protocolId,然后调用对应的消息处理方法
// 这个在哪个线程处理取决于:这个上层的PacketReceiverTask被丢到了哪个线程中
PacketBus.route(session, packet, attachment);
// The routing of the message
var receiver = receiverMap.get(ProtocolManager.protocolId(packet.getClass()));
receiver.invoke(session, packet, attachment);
} catch (Exception e) {
EventBus.post(ServerExceptionEvent.valueOf(session, packet, attachment, e));
logger.error(StringUtils.format("e[uid:{}][sid:{}] unknown exception", session.getUid(), session.getSid(), e.getMessage()), e);
@@ -335,4 +393,68 @@ public class Router implements IRouter {
}
}
@Override
public void registerPacketReceiverDefinition(Object bean) {
var clazz = bean.getClass();
var methods = ReflectionUtils.getMethodsByAnnoInPOJOClass(clazz, PacketReceiver.class);
if (ArrayUtils.isEmpty(methods)) {
return;
}
if (!ReflectionUtils.isPojoClass(clazz)) {
logger.warn("The message registration class [{}] is not a POJO class, and the parent class will not be scanned", clazz);
}
for (var method : methods) {
var paramClazzs = method.getParameterTypes();
AssertionUtils.isTrue(paramClazzs.length == 2 || paramClazzs.length == 3, "[class:{}] [method:{}] must have two or three parameter!", bean.getClass().getName(), method.getName());
AssertionUtils.isTrue(Session.class.isAssignableFrom(paramClazzs[0]), "[class:{}] [method:{}],the first parameter must be Session type parameter Exception.", bean.getClass().getName(), method.getName());
AssertionUtils.isTrue(paramClazzs.length != 3 || AttachmentType.isAttachmentClass(paramClazzs[2]), "[class:{}] [method:{}],the third parameter must be Attachment type parameter Exception.", bean.getClass().getName(), method.getName());
var packetClazz = paramClazzs[1];
var attachmentClazz = paramClazzs.length == 3 ? paramClazzs[2] : null;
var packetName = packetClazz.getCanonicalName();
var methodName = method.getName();
AssertionUtils.isTrue(Modifier.isPublic(method.getModifiers()), "[class:{}] [method:{}] [packet:{}] must use 'public' as modifier!", bean.getClass().getName(), methodName, packetName);
AssertionUtils.isTrue(!Modifier.isStatic(method.getModifiers()), "[class:{}] [method:{}] [packet:{}] can not use 'static' as modifier!", bean.getClass().getName(), methodName, packetName);
var expectedMethodName = StringUtils.format("at{}", packetClazz.getSimpleName());
AssertionUtils.isTrue(methodName.equals(expectedMethodName), "[class:{}] [method:{}] [packet:{}] expects '{}' as method name!", bean.getClass().getName(), methodName, packetName, expectedMethodName);
// These rules are not necessary, but can reduce us from making low-level mistakes
// If the request class name ends with Request which is for outer net client, then the attachment can not be a SignalAttachment
// If the request class name ends with Ask which is for intranet client, then attachment can not be a GatewayAttachment
if (attachmentClazz != null && packetName.endsWith(PacketService.NET_ASK_SUFFIX)) {
AssertionUtils.isTrue(!attachmentClazz.equals(GatewayAttachment.class), "[class:{}] [method:{}] [packet:{}] can not match with [attachment:{}]!", bean.getClass().getName(), methodName, packetName, GatewayAttachment.class.getCanonicalName());
}
var protocolId = Short.MIN_VALUE;
try {
protocolId = ProtocolManager.protocolId(packetClazz);
} catch (Exception e) {
throw new RunException("[class:{}][protocolId:{}] has no registration, please register for this protocol", packetClazz.getSimpleName(), protocolId);
}
try {
AssertionUtils.isNull(receiverMap.get(protocolId), "duplicate protocol registration, @PacketReceiver [class:{}] is repeatedly received [at{}]", packetClazz.getSimpleName(), packetClazz.getSimpleName());
var task = method.getDeclaredAnnotation(PacketReceiver.class).value();
var receiverDefinition = new PacketReceiverDefinition(bean, method, task, packetClazz, attachmentClazz);
if (GraalVmUtils.isGraalVM()) {
receiverMap.put(protocolId, receiverDefinition);
} else {
var enhanceReceiverDefinition = EnhanceUtils.createPacketReceiver(receiverDefinition);
receiverMap.put(protocolId, enhanceReceiverDefinition);
}
} catch (Throwable t) {
throw new RunException("Registration protocol [class:{}] unknown exception", packetClazz.getSimpleName(), t);
}
}
}
}
@@ -13,6 +13,8 @@
package com.zfoo.net.router.receiver;
import com.zfoo.event.anno.Bus;
import com.zfoo.net.anno.Task;
import com.zfoo.net.session.Session;
import com.zfoo.protocol.util.StringUtils;
import com.zfoo.protocol.util.UuidUtils;
@@ -73,6 +75,13 @@ public abstract class EnhanceUtils {
}
enhanceClazz.addMethod(invokeMethod);
// 定义类实现的接口方法bus
CtMethod busMethod = new CtMethod(classPool.get(Bus.class.getCanonicalName()), "task", null, enhanceClazz);
busMethod.setModifiers(Modifier.PUBLIC + Modifier.FINAL);
String busMethodBody = StringUtils.format("{ return {}.{}; }", Task.class.getCanonicalName(), definition.getTask());
busMethod.setBody(busMethodBody);
enhanceClazz.addMethod(busMethod);
enhanceClazz.detach();
var resultClazz = enhanceClazz.toClass(IPacketReceiver.class);
@@ -13,6 +13,7 @@
package com.zfoo.net.router.receiver;
import com.zfoo.net.anno.Task;
import com.zfoo.net.session.Session;
/**
@@ -20,6 +21,8 @@ import com.zfoo.net.session.Session;
*/
public interface IPacketReceiver {
Task task();
void invoke(Session session, Object packet, Object attachment);
}
@@ -13,6 +13,7 @@
package com.zfoo.net.router.receiver;
import com.zfoo.net.anno.Task;
import com.zfoo.net.session.Session;
import com.zfoo.protocol.util.ReflectionUtils;
@@ -36,6 +37,11 @@ public class PacketReceiverDefinition implements IPacketReceiver {
*/
private Method method;
/**
* packet receiver type
*/
private Task task;
/**
* The protocol class that receives the package, eg: TcpHelloRequest
*/
@@ -46,14 +52,20 @@ public class PacketReceiverDefinition implements IPacketReceiver {
*/
private Class<?> attachmentClazz;
public PacketReceiverDefinition(Object bean, Method method, Class<?> packetClazz, Class<?> attachmentClazz) {
public PacketReceiverDefinition(Object bean, Method method, Task task, Class<?> packetClazz, Class<?> attachmentClazz) {
this.bean = bean;
this.method = method;
this.task = task;
this.packetClazz = packetClazz;
this.attachmentClazz = attachmentClazz;
ReflectionUtils.makeAccessible(method);
}
@Override
public Task task() {
return task;
}
@Override
public void invoke(Session session, Object packet, Object attachment) {
if (attachmentClazz == null) {
@@ -79,6 +91,14 @@ public class PacketReceiverDefinition implements IPacketReceiver {
this.method = method;
}
public Task getTask() {
return task;
}
public void setTask(Task task) {
this.task = task;
}
public Class<?> getPacketClazz() {
return packetClazz;
}
@@ -1,126 +0,0 @@
/*
* Copyright (C) 2020 The zfoo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.zfoo.net.router.route;
import com.zfoo.net.anno.PacketReceiver;
import com.zfoo.net.packet.PacketService;
import com.zfoo.net.router.attachment.AttachmentType;
import com.zfoo.net.router.attachment.GatewayAttachment;
import com.zfoo.net.router.attachment.SignalAttachment;
import com.zfoo.net.router.receiver.EnhanceUtils;
import com.zfoo.net.router.receiver.IPacketReceiver;
import com.zfoo.net.router.receiver.PacketReceiverDefinition;
import com.zfoo.net.session.Session;
import com.zfoo.protocol.ProtocolManager;
import com.zfoo.protocol.collection.ArrayUtils;
import com.zfoo.protocol.exception.RunException;
import com.zfoo.protocol.util.AssertionUtils;
import com.zfoo.protocol.util.GraalVmUtils;
import com.zfoo.protocol.util.ReflectionUtils;
import com.zfoo.protocol.util.StringUtils;
import io.netty.util.collection.ShortObjectHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Modifier;
/**
* EN:The receiving route of the packet, the server/client receives the packet and then call corresponding to the Receiver
* CN:包的接收路线,服务器收到packet然后调用对应的Receiver
*
* @author godotg
*/
public abstract class PacketBus {
private static final Logger logger = LoggerFactory.getLogger(PacketBus.class);
private static final ShortObjectHashMap<IPacketReceiver> receiverMap = new ShortObjectHashMap<>();
/**
* The routing of the message
*/
public static void route(Session session, Object packet, Object attachment) {
var receiver = receiverMap.get(ProtocolManager.protocolId(packet.getClass()));
if (receiver == null) {
var name = packet.getClass().getSimpleName();
throw new RuntimeException(StringUtils.format("no any packetReceiver:[at{}] found for this packet:[{}] or no GatewayAttachment sent back if this server is gateway", name, name));
}
receiver.invoke(session, packet, attachment);
}
public static void registerPacketReceiverDefinition(Object bean) {
var clazz = bean.getClass();
var methods = ReflectionUtils.getMethodsByAnnoInPOJOClass(clazz, PacketReceiver.class);
if (ArrayUtils.isEmpty(methods)) {
return;
}
if (!ReflectionUtils.isPojoClass(clazz)) {
logger.warn("The message registration class [{}] is not a POJO class, and the parent class will not be scanned", clazz);
}
for (var method : methods) {
var paramClazzs = method.getParameterTypes();
AssertionUtils.isTrue(paramClazzs.length == 2 || paramClazzs.length == 3, "[class:{}] [method:{}] must have two or three parameter!", bean.getClass().getName(), method.getName());
AssertionUtils.isTrue(Session.class.isAssignableFrom(paramClazzs[0]), "[class:{}] [method:{}],the first parameter must be Session type parameter Exception.", bean.getClass().getName(), method.getName());
AssertionUtils.isTrue(paramClazzs.length != 3 || AttachmentType.isAttachmentClass(paramClazzs[2]), "[class:{}] [method:{}],the third parameter must be Attachment type parameter Exception.", bean.getClass().getName(), method.getName());
var packetClazz = paramClazzs[1];
var attachmentClazz = paramClazzs.length == 3 ? paramClazzs[2] : null;
var packetName = packetClazz.getCanonicalName();
var methodName = method.getName();
AssertionUtils.isTrue(Modifier.isPublic(method.getModifiers()), "[class:{}] [method:{}] [packet:{}] must use 'public' as modifier!", bean.getClass().getName(), methodName, packetName);
AssertionUtils.isTrue(!Modifier.isStatic(method.getModifiers()), "[class:{}] [method:{}] [packet:{}] can not use 'static' as modifier!", bean.getClass().getName(), methodName, packetName);
var expectedMethodName = StringUtils.format("at{}", packetClazz.getSimpleName());
AssertionUtils.isTrue(methodName.equals(expectedMethodName), "[class:{}] [method:{}] [packet:{}] expects '{}' as method name!", bean.getClass().getName(), methodName, packetName, expectedMethodName);
// These rules are not necessary, but can reduce us from making low-level mistakes
// If the request class name ends with Request which is for outer net client, then the attachment can not be a SignalAttachment
// If the request class name ends with Ask which is for intranet client, then attachment can not be a GatewayAttachment
if (attachmentClazz != null && packetName.endsWith(PacketService.NET_ASK_SUFFIX)) {
AssertionUtils.isTrue(!attachmentClazz.equals(GatewayAttachment.class), "[class:{}] [method:{}] [packet:{}] can not match with [attachment:{}]!", bean.getClass().getName(), methodName, packetName, GatewayAttachment.class.getCanonicalName());
}
var protocolId = Short.MIN_VALUE;
try {
protocolId = ProtocolManager.protocolId(packetClazz);
} catch (Exception e) {
throw new RunException("[class:{}][protocolId:{}] has no registration, please register for this protocol", packetClazz.getSimpleName(), protocolId);
}
try {
AssertionUtils.isNull(receiverMap.get(protocolId), "duplicate protocol registration, @PacketReceiver [class:{}] is repeatedly received [at{}]", packetClazz.getSimpleName(), packetClazz.getSimpleName());
var receiverDefinition = new PacketReceiverDefinition(bean, method, packetClazz, attachmentClazz);
if (GraalVmUtils.isGraalVM()) {
receiverMap.put(protocolId, receiverDefinition);
} else {
var enhanceReceiverDefinition = EnhanceUtils.createPacketReceiver(receiverDefinition);
receiverMap.put(protocolId, enhanceReceiverDefinition);
}
} catch (Throwable t) {
throw new RunException("Registration protocol [class:{}] unknown exception", packetClazz.getSimpleName(), t);
}
}
}
}
@@ -32,7 +32,7 @@ public final class PacketReceiverTask implements Runnable {
@Override
public void run() {
NetContext.getRouter().atReceiver(session, packet, attachment);
NetContext.getRouter().atReceiver(this);
}
@@ -92,37 +92,6 @@ public final class TaskBus {
}
}
/**
* Actor模型,最主要的就是线程模型,Actor模型保证了某个Actor所代表的任务永远不会同时在两条线程同时处理任务,这就避免了并发。
* 无论是JavaKotlin,Scala都没有真正的协程,所以最终做到Actor模型的只能是细致的控制线程。
* <p>
* zfoo中通过对线程池的细粒度控制,从而实现了Actor模型。
* 为了简单,可以把Actor可以理解为一个用户或者一个玩家。
* 因为同一个用户或者玩家的uid是固定的,通过uid去计算一致性hashtaskExecutorHash)永远会得到一致的结果,
* 从而保证同一个用户或者玩家的请求总能通过taskExecutorHash被路由到同一台服务器的同一个线程去执行,从而避免了并发,实现了无锁化。
* <p>
* zfoo所代表的Actor模型,是更加精简的Actor模型,让上层调用无感知,在zfoo中可以简单的理解 actor = taskExecutorHash。
* <p>
* 在zfoo这套线程模型中,保证了服务器所接收到的Packet(最终被包装成PacketReceiverTask任务),永远只会在同一条线程处理,
* TaskBus通过AbstractTaskDispatch去派发PacketReceiverTask任务,具体在哪个线程处理通过IAttachment的taskExecutorHash计算。
* <p>
* IAttachment的不同,taskExecutorHash也不同:
* GatewayAttachment:默认是taskExecutorHash等于用户活玩家的uid,也可以通过IGatewayLoadBalancer接口指定
* SignalAttachmenttaskExecutorHash通过IRouter和IConsumer的argument参数指定
*/
public static void dispatchBySession(PacketReceiverTask task) {
var session = task.getSession();
var uid = session.getUid();
if (uid > 0) {
execute((int) uid, task);
} else {
execute((int) session.getSid(), task);
}
}
public static void dispatchByTaskExecutorHash(int taskExecutorHash, PacketReceiverTask task) {
execute(taskExecutorHash, task);
}
private static int calTaskExecutorIndex(int taskExecutorHash) {
// Other hash algorithms can be customized to make the distribution more uniform