mirror of
https://github.com/tiennm99/zfoo.git
synced 2026-05-25 15:59:50 +00:00
feat[load-balancer]: fast load balancer
This commit is contained in:
+31
-11
@@ -15,6 +15,7 @@ package com.zfoo.net.consumer.balancer;
|
||||
|
||||
import com.zfoo.net.NetContext;
|
||||
import com.zfoo.net.session.Session;
|
||||
import com.zfoo.net.util.ReadOnlyTreeMapIntLong;
|
||||
import com.zfoo.protocol.IPacket;
|
||||
import com.zfoo.protocol.ProtocolManager;
|
||||
import com.zfoo.protocol.collection.CollectionUtils;
|
||||
@@ -24,6 +25,7 @@ import com.zfoo.protocol.registration.ProtocolModule;
|
||||
import com.zfoo.util.math.ConsistentHash;
|
||||
import org.springframework.lang.Nullable;
|
||||
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@@ -40,7 +42,7 @@ public class ConsistentHashConsumerLoadBalancer extends AbstractConsumerLoadBala
|
||||
public static final ConsistentHashConsumerLoadBalancer INSTANCE = new ConsistentHashConsumerLoadBalancer();
|
||||
|
||||
private volatile int lastClientSessionChangeId = 0;
|
||||
private static final AtomicReferenceArray<ConsistentHash<String, Long>> consistentHashMap = new AtomicReferenceArray<>(ProtocolManager.MAX_MODULE_NUM);
|
||||
private static final AtomicReferenceArray<ReadOnlyTreeMapIntLong> consistentHashMap = new AtomicReferenceArray<>(ProtocolManager.MAX_MODULE_NUM);
|
||||
private static final int VIRTUAL_NODE_NUMS = 200;
|
||||
|
||||
private ConsistentHashConsumerLoadBalancer() {
|
||||
@@ -78,28 +80,46 @@ public class ConsistentHashConsumerLoadBalancer extends AbstractConsumerLoadBala
|
||||
}
|
||||
|
||||
var module = ProtocolManager.moduleByProtocolId(packet.protocolId());
|
||||
var consistentHash = consistentHashMap.get(module.getId());
|
||||
if (consistentHash == null) {
|
||||
consistentHash = updateModuleToConsistentHash(module);
|
||||
var treeMap = consistentHashMap.get(module.getId());
|
||||
if (treeMap == null) {
|
||||
treeMap = updateModuleToConsistentHash(module);
|
||||
}
|
||||
if (consistentHash == null) {
|
||||
if (treeMap == null) {
|
||||
throw new RunException("ConsistentHashLoadBalancer [protocolId:{}][argument:{}], no service provides the [module:{}]", packet.protocolId(), argument, module);
|
||||
}
|
||||
var sid = consistentHash.getRealNode(argument).getValue();
|
||||
return NetContext.getSessionManager().getClientSession(sid);
|
||||
var index = treeMap.indexOfCeilingKey(argument.hashCode());
|
||||
if (index < 0) {
|
||||
throw new RunException("no service provides the [module:{}]", packet.protocolId(), argument, module);
|
||||
}
|
||||
var sid = treeMap.getByIndex(index);
|
||||
var session = NetContext.getSessionManager().getClientSession(sid);
|
||||
if (session == null) {
|
||||
throw new RunException("unknown no service provides the [module:{}]", packet.protocolId(), argument, module);
|
||||
}
|
||||
return session;
|
||||
}
|
||||
|
||||
|
||||
@Nullable
|
||||
private ConsistentHash<String, Long> updateModuleToConsistentHash(ProtocolModule module) {
|
||||
private ReadOnlyTreeMapIntLong updateModuleToConsistentHash(ProtocolModule module) {
|
||||
var sessionStringList = getSessionsByModule(module).stream()
|
||||
.map(session -> new Pair<>(session.getConsumerAttribute().toString(), session.getSid()))
|
||||
.sorted((a, b) -> a.getKey().compareTo(b.getKey()))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
var consistentHash = CollectionUtils.isNotEmpty(sessionStringList) ? new ConsistentHash<>(sessionStringList, VIRTUAL_NODE_NUMS) : null;
|
||||
consistentHashMap.set(module.getId(), consistentHash);
|
||||
return consistentHash;
|
||||
var consistentHash = new ConsistentHash<>(sessionStringList, VIRTUAL_NODE_NUMS);
|
||||
var virtualNodeTreeMap = consistentHash.getVirtualNodeTreeMap();
|
||||
if (CollectionUtils.isEmpty(virtualNodeTreeMap)) {
|
||||
consistentHashMap.set(module.getId(), null);
|
||||
return null;
|
||||
}
|
||||
var virtualTreeMap = new TreeMap<Integer, Long>();
|
||||
for (var entry : virtualNodeTreeMap.entrySet()) {
|
||||
virtualTreeMap.put(entry.getKey(), entry.getValue().getValue());
|
||||
}
|
||||
var treeMap = new ReadOnlyTreeMapIntLong(virtualTreeMap);
|
||||
consistentHashMap.set(module.getId(), treeMap);
|
||||
return treeMap;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -15,7 +15,10 @@ package com.zfoo.util.math;
|
||||
|
||||
import com.zfoo.protocol.model.Pair;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.TreeMap;
|
||||
|
||||
/**
|
||||
* 带虚拟节点的一致性Hash算法,参考:http://www.zsythink.net/archives/1182
|
||||
@@ -69,5 +72,7 @@ public class ConsistentHash<K, V> {
|
||||
return entry.getValue();
|
||||
}
|
||||
|
||||
|
||||
public TreeMap<Integer, Pair<K, V>> getVirtualNodeTreeMap() {
|
||||
return virtualNodeTreeMap;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user