From 059b0f79dcf23f623a8f0f409da3f177aab56f2b Mon Sep 17 00:00:00 2001 From: godotg Date: Sat, 13 Jan 2024 15:39:57 +0800 Subject: [PATCH] perf[loadbalancer]: cached consistent hash loadbalancer use protocolModuleId and argument as key --- .../CachedConsistentHashLoadBalancer.java | 58 +++++++++++-------- 1 file changed, 35 insertions(+), 23 deletions(-) diff --git a/net/src/main/java/com/zfoo/net/consumer/balancer/CachedConsistentHashLoadBalancer.java b/net/src/main/java/com/zfoo/net/consumer/balancer/CachedConsistentHashLoadBalancer.java index a58678d4..655c0352 100644 --- a/net/src/main/java/com/zfoo/net/consumer/balancer/CachedConsistentHashLoadBalancer.java +++ b/net/src/main/java/com/zfoo/net/consumer/balancer/CachedConsistentHashLoadBalancer.java @@ -1,26 +1,33 @@ package com.zfoo.net.consumer.balancer; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import com.zfoo.net.NetContext; import com.zfoo.net.session.Session; -import org.apache.curator.shaded.com.google.common.util.concurrent.AtomicLongMap; +import com.zfoo.protocol.ProtocolManager; +import com.zfoo.scheduler.util.TimeUtils; import java.util.List; -import java.util.Map; +import java.util.concurrent.TimeUnit; /** * 记忆化一致性hash * * @author qmr - * @date 2024/1/6 */ -public class CachedConsistentHashLoadBalancer extends ConsistentHashLoadBalancer { +public class CachedConsistentHashLoadBalancer extends AbstractConsumerLoadBalancer { public static final CachedConsistentHashLoadBalancer INSTANCE = new CachedConsistentHashLoadBalancer(); + private static final long EXPIRED_ACCESS_DURATION = 10 * TimeUtils.MILLIS_PER_MINUTE; + private static final long MAX_CACHE_SIZE = 10_0000; /** - * 存储已经负载后的sid + * cache the sid after load balancer,key:[protocolModuleId + argument] -> value:[sid] */ - private static final AtomicLongMap uid2sidMap = AtomicLongMap.create(); + private Cache cache = Caffeine.newBuilder() + .expireAfterAccess(EXPIRED_ACCESS_DURATION, TimeUnit.MILLISECONDS) + .maximumSize(MAX_CACHE_SIZE) + .build(); private CachedConsistentHashLoadBalancer() { } @@ -31,26 +38,31 @@ public class CachedConsistentHashLoadBalancer extends ConsistentHashLoadBalancer @Override public Session selectProvider(List providers, Object packet, Object argument) { + if (argument == null) { + return RandomLoadBalancer.getInstance().selectProvider(providers, packet, argument); + } - if (argument instanceof Long) { - long sid = uid2sidMap.get((Long) argument); - if (sid > 0L) { - Session memorySession = NetContext.getSessionManager().getClientSession(sid); - if (null != memorySession) { - return memorySession; - } else { - uid2sidMap.remove((Long) argument); - } - } + if (!(argument instanceof Number)) { + return ConsistentHashLoadBalancer.getInstance().selectProvider(providers, packet, argument); } - Session loadBalancer = super.selectProvider(providers, packet, argument); - if (argument instanceof Long){ - uid2sidMap.put((Long) argument, loadBalancer.getSid()); + + var arg = (Number) argument; + var protocolModuleId = (long) ProtocolManager.moduleByProtocol(packet.getClass()).getId(); + // 8 Byte cachedKey = 1 byte of protocolModuleId + 7 byte of argument + var cachedKey = (protocolModuleId << (7 * 8)) | (0X00FFFFFF_FFFFFFFFL & arg.longValue()); + var sid = cache.getIfPresent(cachedKey); + if (sid == null) { + var providerSession = ConsistentHashLoadBalancer.getInstance().selectProvider(providers, packet, argument); + cache.put(cachedKey, providerSession.getSid()); + return providerSession; } - return loadBalancer; + + var session = NetContext.getSessionManager().getClientSession(sid); + if (session == null) { + session = ConsistentHashLoadBalancer.getInstance().selectProvider(providers, packet, argument); + cache.put(cachedKey, session.getSid()); + } + return session; } - public static Map getUid2sidMap(){ - return uid2sidMap.asMap(); - } }