mirror of
https://github.com/tiennm99/zfoo.git
synced 2026-05-19 17:29:39 +00:00
perf[loadbalancer]: cached consistent hash loadbalancer use protocolModuleId and argument as key
This commit is contained in:
+35
-23
@@ -1,26 +1,33 @@
|
||||
package com.zfoo.net.consumer.balancer;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import com.zfoo.net.NetContext;
|
||||
import com.zfoo.net.session.Session;
|
||||
import org.apache.curator.shaded.com.google.common.util.concurrent.AtomicLongMap;
|
||||
import com.zfoo.protocol.ProtocolManager;
|
||||
import com.zfoo.scheduler.util.TimeUtils;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 记忆化一致性hash
|
||||
*
|
||||
* @author qmr
|
||||
* @date 2024/1/6
|
||||
*/
|
||||
public class CachedConsistentHashLoadBalancer extends ConsistentHashLoadBalancer {
|
||||
public class CachedConsistentHashLoadBalancer extends AbstractConsumerLoadBalancer {
|
||||
|
||||
public static final CachedConsistentHashLoadBalancer INSTANCE = new CachedConsistentHashLoadBalancer();
|
||||
private static final long EXPIRED_ACCESS_DURATION = 10 * TimeUtils.MILLIS_PER_MINUTE;
|
||||
private static final long MAX_CACHE_SIZE = 10_0000;
|
||||
|
||||
/**
|
||||
* 存储已经负载后的sid
|
||||
* cache the sid after load balancer,key:[protocolModuleId + argument] -> value:[sid]
|
||||
*/
|
||||
private static final AtomicLongMap<Long> uid2sidMap = AtomicLongMap.create();
|
||||
private Cache<Long, Long> cache = Caffeine.newBuilder()
|
||||
.expireAfterAccess(EXPIRED_ACCESS_DURATION, TimeUnit.MILLISECONDS)
|
||||
.maximumSize(MAX_CACHE_SIZE)
|
||||
.build();
|
||||
|
||||
private CachedConsistentHashLoadBalancer() {
|
||||
}
|
||||
@@ -31,26 +38,31 @@ public class CachedConsistentHashLoadBalancer extends ConsistentHashLoadBalancer
|
||||
|
||||
@Override
|
||||
public Session selectProvider(List<Session> providers, Object packet, Object argument) {
|
||||
if (argument == null) {
|
||||
return RandomLoadBalancer.getInstance().selectProvider(providers, packet, argument);
|
||||
}
|
||||
|
||||
if (argument instanceof Long) {
|
||||
long sid = uid2sidMap.get((Long) argument);
|
||||
if (sid > 0L) {
|
||||
Session memorySession = NetContext.getSessionManager().getClientSession(sid);
|
||||
if (null != memorySession) {
|
||||
return memorySession;
|
||||
} else {
|
||||
uid2sidMap.remove((Long) argument);
|
||||
}
|
||||
}
|
||||
if (!(argument instanceof Number)) {
|
||||
return ConsistentHashLoadBalancer.getInstance().selectProvider(providers, packet, argument);
|
||||
}
|
||||
Session loadBalancer = super.selectProvider(providers, packet, argument);
|
||||
if (argument instanceof Long){
|
||||
uid2sidMap.put((Long) argument, loadBalancer.getSid());
|
||||
|
||||
var arg = (Number) argument;
|
||||
var protocolModuleId = (long) ProtocolManager.moduleByProtocol(packet.getClass()).getId();
|
||||
// 8 Byte cachedKey = 1 byte of protocolModuleId + 7 byte of argument
|
||||
var cachedKey = (protocolModuleId << (7 * 8)) | (0X00FFFFFF_FFFFFFFFL & arg.longValue());
|
||||
var sid = cache.getIfPresent(cachedKey);
|
||||
if (sid == null) {
|
||||
var providerSession = ConsistentHashLoadBalancer.getInstance().selectProvider(providers, packet, argument);
|
||||
cache.put(cachedKey, providerSession.getSid());
|
||||
return providerSession;
|
||||
}
|
||||
return loadBalancer;
|
||||
|
||||
var session = NetContext.getSessionManager().getClientSession(sid);
|
||||
if (session == null) {
|
||||
session = ConsistentHashLoadBalancer.getInstance().selectProvider(providers, packet, argument);
|
||||
cache.put(cachedKey, session.getSid());
|
||||
}
|
||||
return session;
|
||||
}
|
||||
|
||||
public static Map<Long, Long> getUid2sidMap(){
|
||||
return uid2sidMap.asMap();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user