del[protocol]: deprecate protocolId() method

This commit is contained in:
godotg
2023-09-03 15:20:48 +08:00
parent da31160d60
commit 7c1f3ffdc0
11 changed files with 24 additions and 26 deletions
@@ -74,7 +74,7 @@ public class Consumer implements IConsumer {
@Override
public void send(IPacket packet, Object argument) {
try {
var loadBalancer = loadBalancer(ProtocolManager.moduleByProtocolId(packet.protocolId()));
var loadBalancer = loadBalancer(ProtocolManager.moduleByProtocolId(ProtocolManager.protocolId(packet.getClass())));
var session = loadBalancer.loadBalancer(packet, argument);
var taskExecutorHash = TaskBus.calTaskExecutorHash(argument);
NetContext.getRouter().send(session, packet, NoAnswerAttachment.valueOf(taskExecutorHash));
@@ -85,7 +85,7 @@ public class Consumer implements IConsumer {
@Override
public <T extends IPacket> SyncAnswer<T> syncAsk(IPacket packet, Class<T> answerClass, Object argument) throws Exception {
var loadBalancer = loadBalancer(ProtocolManager.moduleByProtocolId(packet.protocolId()));
var loadBalancer = loadBalancer(ProtocolManager.moduleByProtocolId(ProtocolManager.protocolId(packet.getClass())));
var session = loadBalancer.loadBalancer(packet, argument);
@@ -124,7 +124,7 @@ public class Consumer implements IConsumer {
@Override
public <T extends IPacket> AsyncAnswer<T> asyncAsk(IPacket packet, Class<T> answerClass, Object argument) {
var loadBalancer = loadBalancer(ProtocolManager.moduleByProtocolId(packet.protocolId()));
var loadBalancer = loadBalancer(ProtocolManager.moduleByProtocolId(ProtocolManager.protocolId(packet.getClass())));
var session = loadBalancer.loadBalancer(packet, argument);
var asyncAnswer = NetContext.getRouter().asyncAsk(session, packet, answerClass, argument);
@@ -25,7 +25,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
* @author godotg
@@ -49,7 +48,7 @@ public abstract class AbstractConsumerLoadBalancer implements IConsumerLoadBalan
}
public List<Session> getSessionsByPacket(IPacket packet) {
return getSessionsByModule(ProtocolManager.moduleByProtocolId(packet.protocolId()));
return getSessionsByModule(ProtocolManager.moduleByProtocolId(ProtocolManager.protocolId(packet.getClass())));
}
public List<Session> getSessionsByModule(ProtocolModule module) {
@@ -80,7 +79,7 @@ public abstract class AbstractConsumerLoadBalancer implements IConsumerLoadBalan
return false;
}
var module = ProtocolManager.moduleByProtocolId(packet.protocolId());
var module = ProtocolManager.moduleByProtocolId(ProtocolManager.protocolId(packet.getClass()));
return registerVO.getProviderConfig().getProviders().contains(module);
}
}
@@ -15,6 +15,7 @@ package com.zfoo.net.consumer.balancer;
import com.zfoo.net.NetContext;
import com.zfoo.net.session.Session;
import com.zfoo.net.util.ConsistentHash;
import com.zfoo.net.util.FastTreeMapIntLong;
import com.zfoo.protocol.IPacket;
import com.zfoo.protocol.ProtocolManager;
@@ -22,7 +23,6 @@ import com.zfoo.protocol.collection.CollectionUtils;
import com.zfoo.protocol.exception.RunException;
import com.zfoo.protocol.model.Pair;
import com.zfoo.protocol.registration.ProtocolModule;
import com.zfoo.net.util.ConsistentHash;
import org.springframework.lang.Nullable;
import java.util.TreeMap;
@@ -79,22 +79,22 @@ public class ConsistentHashConsumerLoadBalancer extends AbstractConsumerLoadBala
lastClientSessionChangeId = currentClientSessionChangeId;
}
var module = ProtocolManager.moduleByProtocolId(packet.protocolId());
var module = ProtocolManager.moduleByProtocolId(ProtocolManager.protocolId(packet.getClass()));
var fastTreeMap = consistentHashMap.get(module.getId());
if (fastTreeMap == null) {
fastTreeMap = updateModuleToConsistentHash(module);
}
if (fastTreeMap == null) {
throw new RunException("ConsistentHashLoadBalancer [protocolId:{}][argument:{}], no service provides the [module:{}]", packet.protocolId(), argument, module);
throw new RunException("ConsistentHashLoadBalancer [protocol:{}][argument:{}], no service provides the [module:{}]", packet.getClass(), argument, module);
}
var nearestIndex = fastTreeMap.indexOfNearestCeilingKey(argument.hashCode());
if (nearestIndex < 0) {
throw new RunException("no service provides the [module:{}]", packet.protocolId(), argument, module);
throw new RunException("no service provides the [module:{}]", module);
}
var sid = fastTreeMap.getByIndex(nearestIndex);
var session = NetContext.getSessionManager().getClientSession(sid);
if (session == null) {
throw new RunException("unknown no service provides the [module:{}]", packet.protocolId(), argument, module);
throw new RunException("unknown no service provides the [module:{}]", module);
}
return session;
}
@@ -38,11 +38,11 @@ public class RandomConsumerLoadBalancer extends AbstractConsumerLoadBalancer {
@Override
public Session loadBalancer(IPacket packet, Object argument) {
var module = ProtocolManager.moduleByProtocolId(packet.protocolId());
var module = ProtocolManager.moduleByProtocolId(ProtocolManager.protocolId(packet.getClass()));
var sessions = getSessionsByModule(module);
if (sessions.isEmpty()) {
throw new RunException("RandomConsumerLoadBalancer [protocolId:{}][argument:{}], no service provides the [module:{}]", packet.protocolId(), argument, module);
throw new RunException("RandomConsumerLoadBalancer [protocol:{}][argument:{}], no service provides the [module:{}]", packet.getClass(), argument, module);
}
return RandomUtils.randomEle(sessions);
@@ -19,7 +19,6 @@ import com.zfoo.net.packet.DecodedPacketInfo;
import com.zfoo.net.packet.EncodedPacketInfo;
import com.zfoo.net.packet.PacketService;
import com.zfoo.net.router.attachment.IAttachment;
import com.zfoo.net.util.SessionUtils;
import com.zfoo.protocol.IPacket;
import com.zfoo.protocol.ProtocolManager;
import com.zfoo.protocol.buffer.ByteBufUtils;
@@ -28,9 +27,6 @@ import com.zfoo.protocol.util.StringUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
@@ -94,7 +90,7 @@ public class JProtobufTcpCodecHandler extends ByteToMessageCodec<EncodedPacketIn
// header(4byte) + protocolId(2byte)
buffer.writeInt(bytes.length + 2);
var protocolId = packet.protocolId();
var protocolId = ProtocolManager.protocolId(packet.getClass());
// 写入协议号
ByteBufUtils.writeShort(buffer, protocolId);
@@ -63,8 +63,8 @@ public class JsonWebSocketCodecHandler extends MessageToMessageCodec<WebSocketFr
var packet = out.getPacket();
var attachment = out.getAttachment();
var attachmentId = attachment == null ? -1 : attachment.protocolId();
var jsonPacket = JsonPacket.valueOf(packet.protocolId(), packet, attachmentId, attachment);
var attachmentId = attachment == null ? -1 : ProtocolManager.protocolId(attachment.getClass());
var jsonPacket = JsonPacket.valueOf(ProtocolManager.protocolId(packet.getClass()), packet, attachmentId, attachment);
var bytes = StringUtils.bytes(JsonUtils.object2String(jsonPacket));
byteBuf.writeBytes(bytes);
@@ -53,7 +53,7 @@ public class Error implements IPacket {
public static Error valueOf(IPacket packet, int errorCode, String errorMessage) {
Error response = new Error();
response.module = ProtocolManager.getProtocol(packet.protocolId()).module();
response.module = ProtocolManager.getProtocol(ProtocolManager.protocolId(packet.getClass())).module();
response.errorCode = errorCode;
response.errorMessage = errorMessage;
return response;
@@ -47,7 +47,7 @@ public class Message implements IPacket {
public static Message valueOf(IPacket packet, int code, String message) {
var mess = new Message();
mess.module = ProtocolManager.moduleByProtocolId(packet.protocolId()).getId();
mess.module = ProtocolManager.moduleByProtocolId(ProtocolManager.protocolId(packet.getClass())).getId();
mess.code = code;
mess.message = message;
return mess;
@@ -160,7 +160,8 @@ public class Router implements IRouter {
var channel = session.getChannel();
if (!channel.isActive() || !channel.isWritable()) {
logger.warn("send msg error, protocolId=[{}] isActive=[{}] isWritable=[{}]", packet.protocolId(), channel.isActive(), channel.isWritable());
logger.warn("send msg error, protocol [{}] isActive=[{}] isWritable=[{}]"
, packet.getClass().getSimpleName(), channel.isActive(), channel.isWritable());
}
channel.writeAndFlush(packetInfo);
}
@@ -14,7 +14,9 @@
package com.zfoo.net.router.route;
import com.zfoo.net.packet.PacketService;
import com.zfoo.net.router.attachment.*;
import com.zfoo.net.router.attachment.GatewayAttachment;
import com.zfoo.net.router.attachment.IAttachment;
import com.zfoo.net.router.attachment.SignalAttachment;
import com.zfoo.net.router.receiver.EnhanceUtils;
import com.zfoo.net.router.receiver.IPacketReceiver;
import com.zfoo.net.router.receiver.PacketReceiver;
@@ -50,7 +52,7 @@ public abstract class PacketBus {
* The routing of the message
*/
public static void route(Session session, IPacket packet, IAttachment attachment) {
var receiver = receiverMap.get(packet.protocolId());
var receiver = receiverMap.get(ProtocolManager.protocolId(packet.getClass()));
if (receiver == null) {
var name = packet.getClass().getSimpleName();
throw new RuntimeException(StringUtils.format("no any packetReceiver:[at{}] found for this packet:[{}] or no GatewayAttachment sent back if this server is gateway", name, name));
@@ -57,7 +57,7 @@ public class ProtocolManager {
* serialize the packet into the buffer
*/
public static void write(ByteBuf buffer, IPacket packet) {
var protocolId = packet.protocolId();
var protocolId = protocolId(packet.getClass());
// write the protocolId
ByteBufUtils.writeShort(buffer, protocolId);
// write the package