diff --git a/net/src/main/java/com/zfoo/net/consumer/Consumer.java b/net/src/main/java/com/zfoo/net/consumer/Consumer.java index b1d5c953..fbb17f6e 100644 --- a/net/src/main/java/com/zfoo/net/consumer/Consumer.java +++ b/net/src/main/java/com/zfoo/net/consumer/Consumer.java @@ -74,7 +74,7 @@ public class Consumer implements IConsumer { @Override public void send(IPacket packet, Object argument) { try { - var loadBalancer = loadBalancer(ProtocolManager.moduleByProtocolId(packet.protocolId())); + var loadBalancer = loadBalancer(ProtocolManager.moduleByProtocolId(ProtocolManager.protocolId(packet.getClass()))); var session = loadBalancer.loadBalancer(packet, argument); var taskExecutorHash = TaskBus.calTaskExecutorHash(argument); NetContext.getRouter().send(session, packet, NoAnswerAttachment.valueOf(taskExecutorHash)); @@ -85,7 +85,7 @@ public class Consumer implements IConsumer { @Override public SyncAnswer syncAsk(IPacket packet, Class answerClass, Object argument) throws Exception { - var loadBalancer = loadBalancer(ProtocolManager.moduleByProtocolId(packet.protocolId())); + var loadBalancer = loadBalancer(ProtocolManager.moduleByProtocolId(ProtocolManager.protocolId(packet.getClass()))); var session = loadBalancer.loadBalancer(packet, argument); @@ -124,7 +124,7 @@ public class Consumer implements IConsumer { @Override public AsyncAnswer asyncAsk(IPacket packet, Class answerClass, Object argument) { - var loadBalancer = loadBalancer(ProtocolManager.moduleByProtocolId(packet.protocolId())); + var loadBalancer = loadBalancer(ProtocolManager.moduleByProtocolId(ProtocolManager.protocolId(packet.getClass()))); var session = loadBalancer.loadBalancer(packet, argument); var asyncAnswer = NetContext.getRouter().asyncAsk(session, packet, answerClass, argument); diff --git a/net/src/main/java/com/zfoo/net/consumer/balancer/AbstractConsumerLoadBalancer.java b/net/src/main/java/com/zfoo/net/consumer/balancer/AbstractConsumerLoadBalancer.java index f6d219c7..4c462b4e 100644 --- a/net/src/main/java/com/zfoo/net/consumer/balancer/AbstractConsumerLoadBalancer.java +++ b/net/src/main/java/com/zfoo/net/consumer/balancer/AbstractConsumerLoadBalancer.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.function.Consumer; -import java.util.stream.Collectors; /** * @author godotg @@ -49,7 +48,7 @@ public abstract class AbstractConsumerLoadBalancer implements IConsumerLoadBalan } public List getSessionsByPacket(IPacket packet) { - return getSessionsByModule(ProtocolManager.moduleByProtocolId(packet.protocolId())); + return getSessionsByModule(ProtocolManager.moduleByProtocolId(ProtocolManager.protocolId(packet.getClass()))); } public List getSessionsByModule(ProtocolModule module) { @@ -80,7 +79,7 @@ public abstract class AbstractConsumerLoadBalancer implements IConsumerLoadBalan return false; } - var module = ProtocolManager.moduleByProtocolId(packet.protocolId()); + var module = ProtocolManager.moduleByProtocolId(ProtocolManager.protocolId(packet.getClass())); return registerVO.getProviderConfig().getProviders().contains(module); } } diff --git a/net/src/main/java/com/zfoo/net/consumer/balancer/ConsistentHashConsumerLoadBalancer.java b/net/src/main/java/com/zfoo/net/consumer/balancer/ConsistentHashConsumerLoadBalancer.java index d6203ce6..5a6ff2ea 100644 --- a/net/src/main/java/com/zfoo/net/consumer/balancer/ConsistentHashConsumerLoadBalancer.java +++ b/net/src/main/java/com/zfoo/net/consumer/balancer/ConsistentHashConsumerLoadBalancer.java @@ -15,6 +15,7 @@ package com.zfoo.net.consumer.balancer; import com.zfoo.net.NetContext; import com.zfoo.net.session.Session; +import com.zfoo.net.util.ConsistentHash; import com.zfoo.net.util.FastTreeMapIntLong; import com.zfoo.protocol.IPacket; import com.zfoo.protocol.ProtocolManager; @@ -22,7 +23,6 @@ import com.zfoo.protocol.collection.CollectionUtils; import com.zfoo.protocol.exception.RunException; import com.zfoo.protocol.model.Pair; import com.zfoo.protocol.registration.ProtocolModule; -import com.zfoo.net.util.ConsistentHash; import org.springframework.lang.Nullable; import java.util.TreeMap; @@ -79,22 +79,22 @@ public class ConsistentHashConsumerLoadBalancer extends AbstractConsumerLoadBala lastClientSessionChangeId = currentClientSessionChangeId; } - var module = ProtocolManager.moduleByProtocolId(packet.protocolId()); + var module = ProtocolManager.moduleByProtocolId(ProtocolManager.protocolId(packet.getClass())); var fastTreeMap = consistentHashMap.get(module.getId()); if (fastTreeMap == null) { fastTreeMap = updateModuleToConsistentHash(module); } if (fastTreeMap == null) { - throw new RunException("ConsistentHashLoadBalancer [protocolId:{}][argument:{}], no service provides the [module:{}]", packet.protocolId(), argument, module); + throw new RunException("ConsistentHashLoadBalancer [protocol:{}][argument:{}], no service provides the [module:{}]", packet.getClass(), argument, module); } var nearestIndex = fastTreeMap.indexOfNearestCeilingKey(argument.hashCode()); if (nearestIndex < 0) { - throw new RunException("no service provides the [module:{}]", packet.protocolId(), argument, module); + throw new RunException("no service provides the [module:{}]", module); } var sid = fastTreeMap.getByIndex(nearestIndex); var session = NetContext.getSessionManager().getClientSession(sid); if (session == null) { - throw new RunException("unknown no service provides the [module:{}]", packet.protocolId(), argument, module); + throw new RunException("unknown no service provides the [module:{}]", module); } return session; } diff --git a/net/src/main/java/com/zfoo/net/consumer/balancer/RandomConsumerLoadBalancer.java b/net/src/main/java/com/zfoo/net/consumer/balancer/RandomConsumerLoadBalancer.java index b01dfd65..5e52c11a 100644 --- a/net/src/main/java/com/zfoo/net/consumer/balancer/RandomConsumerLoadBalancer.java +++ b/net/src/main/java/com/zfoo/net/consumer/balancer/RandomConsumerLoadBalancer.java @@ -38,11 +38,11 @@ public class RandomConsumerLoadBalancer extends AbstractConsumerLoadBalancer { @Override public Session loadBalancer(IPacket packet, Object argument) { - var module = ProtocolManager.moduleByProtocolId(packet.protocolId()); + var module = ProtocolManager.moduleByProtocolId(ProtocolManager.protocolId(packet.getClass())); var sessions = getSessionsByModule(module); if (sessions.isEmpty()) { - throw new RunException("RandomConsumerLoadBalancer [protocolId:{}][argument:{}], no service provides the [module:{}]", packet.protocolId(), argument, module); + throw new RunException("RandomConsumerLoadBalancer [protocol:{}][argument:{}], no service provides the [module:{}]", packet.getClass(), argument, module); } return RandomUtils.randomEle(sessions); diff --git a/net/src/main/java/com/zfoo/net/handler/codec/jprotobuf/JProtobufTcpCodecHandler.java b/net/src/main/java/com/zfoo/net/handler/codec/jprotobuf/JProtobufTcpCodecHandler.java index c87792b0..c1f75bdb 100644 --- a/net/src/main/java/com/zfoo/net/handler/codec/jprotobuf/JProtobufTcpCodecHandler.java +++ b/net/src/main/java/com/zfoo/net/handler/codec/jprotobuf/JProtobufTcpCodecHandler.java @@ -19,7 +19,6 @@ import com.zfoo.net.packet.DecodedPacketInfo; import com.zfoo.net.packet.EncodedPacketInfo; import com.zfoo.net.packet.PacketService; import com.zfoo.net.router.attachment.IAttachment; -import com.zfoo.net.util.SessionUtils; import com.zfoo.protocol.IPacket; import com.zfoo.protocol.ProtocolManager; import com.zfoo.protocol.buffer.ByteBufUtils; @@ -28,9 +27,6 @@ import com.zfoo.protocol.util.StringUtils; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageCodec; -import io.netty.util.ReferenceCountUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; @@ -94,7 +90,7 @@ public class JProtobufTcpCodecHandler extends ByteToMessageCodec