diff --git a/net/src/main/java/com/zfoo/net/consumer/balancer/ConsistentHashConsumerLoadBalancer.java b/net/src/main/java/com/zfoo/net/consumer/balancer/ConsistentHashConsumerLoadBalancer.java index 9595e09c..d757cbfc 100644 --- a/net/src/main/java/com/zfoo/net/consumer/balancer/ConsistentHashConsumerLoadBalancer.java +++ b/net/src/main/java/com/zfoo/net/consumer/balancer/ConsistentHashConsumerLoadBalancer.java @@ -15,6 +15,7 @@ package com.zfoo.net.consumer.balancer; import com.zfoo.net.NetContext; import com.zfoo.net.session.Session; +import com.zfoo.net.util.ReadOnlyTreeMapIntLong; import com.zfoo.protocol.IPacket; import com.zfoo.protocol.ProtocolManager; import com.zfoo.protocol.collection.CollectionUtils; @@ -24,6 +25,7 @@ import com.zfoo.protocol.registration.ProtocolModule; import com.zfoo.util.math.ConsistentHash; import org.springframework.lang.Nullable; +import java.util.TreeMap; import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.stream.Collectors; @@ -40,7 +42,7 @@ public class ConsistentHashConsumerLoadBalancer extends AbstractConsumerLoadBala public static final ConsistentHashConsumerLoadBalancer INSTANCE = new ConsistentHashConsumerLoadBalancer(); private volatile int lastClientSessionChangeId = 0; - private static final AtomicReferenceArray> consistentHashMap = new AtomicReferenceArray<>(ProtocolManager.MAX_MODULE_NUM); + private static final AtomicReferenceArray consistentHashMap = new AtomicReferenceArray<>(ProtocolManager.MAX_MODULE_NUM); private static final int VIRTUAL_NODE_NUMS = 200; private ConsistentHashConsumerLoadBalancer() { @@ -78,28 +80,46 @@ public class ConsistentHashConsumerLoadBalancer extends AbstractConsumerLoadBala } var module = ProtocolManager.moduleByProtocolId(packet.protocolId()); - var consistentHash = consistentHashMap.get(module.getId()); - if (consistentHash == null) { - consistentHash = updateModuleToConsistentHash(module); + var treeMap = consistentHashMap.get(module.getId()); + if (treeMap == null) { + treeMap = updateModuleToConsistentHash(module); } - if (consistentHash == null) { + if (treeMap == null) { throw new RunException("ConsistentHashLoadBalancer [protocolId:{}][argument:{}], no service provides the [module:{}]", packet.protocolId(), argument, module); } - var sid = consistentHash.getRealNode(argument).getValue(); - return NetContext.getSessionManager().getClientSession(sid); + var index = treeMap.indexOfCeilingKey(argument.hashCode()); + if (index < 0) { + throw new RunException("no service provides the [module:{}]", packet.protocolId(), argument, module); + } + var sid = treeMap.getByIndex(index); + var session = NetContext.getSessionManager().getClientSession(sid); + if (session == null) { + throw new RunException("unknown no service provides the [module:{}]", packet.protocolId(), argument, module); + } + return session; } @Nullable - private ConsistentHash updateModuleToConsistentHash(ProtocolModule module) { + private ReadOnlyTreeMapIntLong updateModuleToConsistentHash(ProtocolModule module) { var sessionStringList = getSessionsByModule(module).stream() .map(session -> new Pair<>(session.getConsumerAttribute().toString(), session.getSid())) .sorted((a, b) -> a.getKey().compareTo(b.getKey())) .collect(Collectors.toList()); - var consistentHash = CollectionUtils.isNotEmpty(sessionStringList) ? new ConsistentHash<>(sessionStringList, VIRTUAL_NODE_NUMS) : null; - consistentHashMap.set(module.getId(), consistentHash); - return consistentHash; + var consistentHash = new ConsistentHash<>(sessionStringList, VIRTUAL_NODE_NUMS); + var virtualNodeTreeMap = consistentHash.getVirtualNodeTreeMap(); + if (CollectionUtils.isEmpty(virtualNodeTreeMap)) { + consistentHashMap.set(module.getId(), null); + return null; + } + var virtualTreeMap = new TreeMap(); + for (var entry : virtualNodeTreeMap.entrySet()) { + virtualTreeMap.put(entry.getKey(), entry.getValue().getValue()); + } + var treeMap = new ReadOnlyTreeMapIntLong(virtualTreeMap); + consistentHashMap.set(module.getId(), treeMap); + return treeMap; } } diff --git a/util/src/main/java/com/zfoo/util/math/ConsistentHash.java b/util/src/main/java/com/zfoo/util/math/ConsistentHash.java index f5996905..9e2818d1 100644 --- a/util/src/main/java/com/zfoo/util/math/ConsistentHash.java +++ b/util/src/main/java/com/zfoo/util/math/ConsistentHash.java @@ -15,7 +15,10 @@ package com.zfoo.util.math; import com.zfoo.protocol.model.Pair; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.TreeMap; /** * 带虚拟节点的一致性Hash算法,参考:http://www.zsythink.net/archives/1182 @@ -69,5 +72,7 @@ public class ConsistentHash { return entry.getValue(); } - + public TreeMap> getVirtualNodeTreeMap() { + return virtualNodeTreeMap; + } }