ref[attachment]: refactor attachment of net

This commit is contained in:
godotg
2023-04-14 23:28:12 +08:00
parent f763246231
commit 384b79d58d
@@ -64,7 +64,7 @@ public class Router implements IRouter {
* atReceiver会设置attachment,但是在方法调用完成会取消,不需要过多关注。
* asyncAsk会再次设置attachment,需要重点关注。
*/
private final FastThreadLocal<SignalAttachment> serverReceiveSignalAttachmentThreadLocal = new FastThreadLocal<>();
private final FastThreadLocal<IAttachment> serverReceiverAttachmentThreadLocal = new FastThreadLocal<>();
/**
* 在服务端收到数据后,会调用这个方法. 这个方法在BaseRouteHandler.java的channelRead中被调用
@@ -85,7 +85,6 @@ public class Router implements IRouter {
if (signalAttachment.isClient()) {
// 服务器收到signalAttachment,不做任何处理
signalAttachment.setClient(false);
} else {
// 客户端收到服务器应答,客户端发送的时候isClient为true,服务器收到的时候将其设置为false
var removedAttachment = (SignalAttachment) SignalBridge.removeSignalAttachment(signalAttachment);
@@ -95,11 +94,9 @@ public class Router implements IRouter {
} else {
logger.error("client receives packet:[{}] and attachment:[{}] from server, but clientAttachmentMap has no attachment, perhaps timeout exception.", JsonUtils.object2String(packet), JsonUtils.object2String(attachment));
}
// 注意:这个return,这样子,asyncAsk的结果就返回了。
return;
}
break;
case GATEWAY_PACKET:
var gatewayAttachment = (GatewayAttachment) attachment;
@@ -171,18 +168,7 @@ public class Router implements IRouter {
@Override
public void send(Session session, IPacket packet) {
// 服务器异步返回的消息的发送会有signalAttachment,验证返回的消息是否满足
var serverSignalAttachment = serverReceiveSignalAttachmentThreadLocal.get();
if (serverSignalAttachment != null) {
if (serverSignalAttachment.isClient()) {
// 客户端发送的时候不应该有serverSignalAttachment
logger.error("client can not have serverSignalAttachment:[{}] and packet:[{}]", serverSignalAttachment, packet);
} else if (Error.errorProtocolId() == packet.protocolId()) {
// 错误信息直接返回
}
}
var serverSignalAttachment = serverReceiverAttachmentThreadLocal.get();
send(session, packet, serverSignalAttachment);
}
@@ -230,7 +216,7 @@ public class Router implements IRouter {
clientSignalAttachment.setTaskExecutorHash(taskExecutorHash);
// 服务器在同步或异步的消息处理中,又调用了同步或异步的方法,这时候threadReceiverAttachment不为空
var serverSignalAttachment = serverReceiveSignalAttachmentThreadLocal.get();
var serverSignalAttachment = serverReceiverAttachmentThreadLocal.get();
try {
var asyncAnswer = new AsyncAnswer<T>();
@@ -258,7 +244,7 @@ public class Router implements IRouter {
// 接收者在同步或异步的消息处理中,又调用了异步的方法,这时候threadServerAttachment不为空
if (serverSignalAttachment != null) {
serverReceiveSignalAttachmentThreadLocal.set(serverSignalAttachment);
serverReceiverAttachmentThreadLocal.set(serverSignalAttachment);
}
// 如果有异常的话,whenCompleteAsync的下一个thenAccept不会执行
@@ -279,7 +265,7 @@ public class Router implements IRouter {
logger.error("Asynchronous callback method [ask:{}][answer:{}] error", packet.getClass().getSimpleName(), answer.getClass().getSimpleName(), throwable1);
} finally {
if (serverSignalAttachment != null) {
serverReceiveSignalAttachmentThreadLocal.set(null);
serverReceiverAttachmentThreadLocal.set(null);
}
}
@@ -309,13 +295,7 @@ public class Router implements IRouter {
try {
// 接收者(服务器)同步和异步消息的接收
if (attachment != null) {
switch (attachment.packetType()) {
case SIGNAL_PACKET:
serverReceiveSignalAttachmentThreadLocal.set((SignalAttachment) attachment);
break;
default:
break;
}
serverReceiverAttachmentThreadLocal.set(attachment);
}
// 调用PacketReceiver,进行真正的业务处理,这个submit只是根据packet找到protocolId,然后调用对应的消息处理方法
@@ -329,13 +309,7 @@ public class Router implements IRouter {
} finally {
// 如果有服务器在处理同步或者异步消息的时候由于错误没有返回给客户端消息,则可能会残留serverAttachment,所以先移除
if (attachment != null) {
switch (attachment.packetType()) {
case SIGNAL_PACKET:
serverReceiveSignalAttachmentThreadLocal.set(null);
break;
default:
break;
}
serverReceiverAttachmentThreadLocal.set(null);
}
}
}