Merge pull request #86 from qiaomengrui/zfoo-3.2.0.a

feat[load balancer]: Consistent Hash Of Memory Consumer LoadBalancer
This commit is contained in:
godotg
2024-01-06 14:37:16 +08:00
committed by GitHub
14 changed files with 111 additions and 41 deletions
+2 -2
View File
@@ -7,10 +7,10 @@
<groupId>com.zfoo</groupId>
<artifactId>boot</artifactId>
<packaging>jar</packaging>
<version>3.2.0</version>
<version>3.2.0.a</version>
<properties>
<zfoo.version>3.2.0</zfoo.version>
<zfoo.version>3.2.0.a</zfoo.version>
<!-- spring and spring boot -->
<spring.version>6.0.11</spring.version>
<spring.boot.version>3.1.3</spring.boot.version>
+2 -2
View File
@@ -7,10 +7,10 @@
<groupId>com.zfoo</groupId>
<artifactId>event</artifactId>
<packaging>jar</packaging>
<version>3.2.0</version>
<version>3.2.0.a</version>
<properties>
<zfoo.version>3.2.0</zfoo.version>
<zfoo.version>3.2.0.a</zfoo.version>
<!-- spring and spring boot -->
<spring.version>6.0.11</spring.version>
<spring.boot.version>3.1.3</spring.boot.version>
+2 -2
View File
@@ -7,10 +7,10 @@
<groupId>com.zfoo</groupId>
<artifactId>hotswap</artifactId>
<packaging>jar</packaging>
<version>3.2.0</version>
<version>3.2.0.a</version>
<properties>
<zfoo.version>3.2.0</zfoo.version>
<zfoo.version>3.2.0.a</zfoo.version>
<!-- spring and spring boot -->
<spring.version>6.0.11</spring.version>
<spring.boot.version>3.1.3</spring.boot.version>
+2 -2
View File
@@ -7,10 +7,10 @@
<groupId>com.zfoo</groupId>
<artifactId>monitor</artifactId>
<packaging>jar</packaging>
<version>3.2.0</version>
<version>3.2.0.a</version>
<properties>
<zfoo.version>3.2.0</zfoo.version>
<zfoo.version>3.2.0.a</zfoo.version>
<!-- spring and spring boot -->
<spring.version>6.0.11</spring.version>
<spring.boot.version>3.1.3</spring.boot.version>
+2 -2
View File
@@ -7,10 +7,10 @@
<groupId>com.zfoo</groupId>
<artifactId>net</artifactId>
<packaging>jar</packaging>
<version>3.2.0</version>
<version>3.2.0.a</version>
<properties>
<zfoo.version>3.2.0</zfoo.version>
<zfoo.version>3.2.0.a</zfoo.version>
<!-- spring and spring boot -->
<spring.version>6.0.11</spring.version>
<spring.boot.version>3.1.3</spring.boot.version>
@@ -39,6 +39,9 @@ public abstract class AbstractConsumerLoadBalancer implements IConsumerLoadBalan
case "consistent-hash":
balancer = ConsistentHashConsumerLoadBalancer.getInstance();
break;
case "consistent-hash-of-memory":
balancer = ConsistentHashOfMemoryConsumerLoadBalancer.getInstance();
break;
default:
throw new RuntimeException(StringUtils.format("Load balancer is not recognized[{}]", loadBalancer));
}
@@ -17,6 +17,7 @@ import com.zfoo.net.NetContext;
import com.zfoo.net.session.Session;
import com.zfoo.net.util.ConsistentHash;
import com.zfoo.net.util.FastTreeMapIntLong;
import com.zfoo.net.util.HashUtils;
import com.zfoo.protocol.ProtocolManager;
import com.zfoo.protocol.collection.CollectionUtils;
import com.zfoo.protocol.exception.RunException;
@@ -42,7 +43,7 @@ public class ConsistentHashConsumerLoadBalancer extends AbstractConsumerLoadBala
private static final AtomicReferenceArray<FastTreeMapIntLong> consistentHashMap = new AtomicReferenceArray<>(ProtocolManager.MAX_MODULE_NUM);
private static final int VIRTUAL_NODE_NUMS = 200;
private ConsistentHashConsumerLoadBalancer() {
public ConsistentHashConsumerLoadBalancer() {
}
public static ConsistentHashConsumerLoadBalancer getInstance() {
@@ -62,6 +63,29 @@ public class ConsistentHashConsumerLoadBalancer extends AbstractConsumerLoadBala
return RandomConsumerLoadBalancer.getInstance().loadBalancer(packet, argument);
}
updateConsistentHashMap();
var module = ProtocolManager.moduleByProtocol(packet.getClass());
var fastTreeMap = consistentHashMap.get(module.getId());
if (fastTreeMap == null) {
fastTreeMap = updateModuleToConsistentHash(module);
}
if (fastTreeMap == null) {
throw new RunException("ConsistentHashLoadBalancer [protocol:{}][argument:{}], no service provides the [module:{}]", packet.getClass(), argument, module);
}
var nearestIndex = fastTreeMap.indexOfNearestCeilingKey(HashUtils.fnvHash(argument));
if (nearestIndex < 0) {
throw new RunException("no service provides the [module:{}]", module);
}
var sid = fastTreeMap.getByIndex(nearestIndex);
var session = NetContext.getSessionManager().getClientSession(sid);
if (session == null) {
throw new RunException("unknown no service provides the [module:{}]", module);
}
return session;
}
private void updateConsistentHashMap() {
// 如果更新时间不匹配,则更新到最新的服务提供者
var currentClientSessionChangeId = NetContext.getSessionManager().getClientSessionChangeId();
if (currentClientSessionChangeId != lastClientSessionChangeId) {
@@ -75,25 +99,6 @@ public class ConsistentHashConsumerLoadBalancer extends AbstractConsumerLoadBala
}
lastClientSessionChangeId = currentClientSessionChangeId;
}
var module = ProtocolManager.moduleByProtocol(packet.getClass());
var fastTreeMap = consistentHashMap.get(module.getId());
if (fastTreeMap == null) {
fastTreeMap = updateModuleToConsistentHash(module);
}
if (fastTreeMap == null) {
throw new RunException("ConsistentHashLoadBalancer [protocol:{}][argument:{}], no service provides the [module:{}]", packet.getClass(), argument, module);
}
var nearestIndex = fastTreeMap.indexOfNearestCeilingKey(argument.hashCode());
if (nearestIndex < 0) {
throw new RunException("no service provides the [module:{}]", module);
}
var sid = fastTreeMap.getByIndex(nearestIndex);
var session = NetContext.getSessionManager().getClientSession(sid);
if (session == null) {
throw new RunException("unknown no service provides the [module:{}]", module);
}
return session;
}
@@ -0,0 +1,57 @@
package com.zfoo.net.consumer.balancer;
import com.zfoo.net.NetContext;
import com.zfoo.net.session.Session;
import org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.curator.shaded.com.google.common.util.concurrent.AtomicLongMap;
import java.util.List;
import java.util.Map;
/**
* 记忆化一致性hash
*
* @author qmr
* @date 2024/1/6
*/
public class ConsistentHashOfMemoryConsumerLoadBalancer extends ConsistentHashConsumerLoadBalancer {
public static final ConsistentHashOfMemoryConsumerLoadBalancer INSTANCE = new ConsistentHashOfMemoryConsumerLoadBalancer();
/**
* 存储已经负载后的sid
*/
private static final AtomicLongMap<Long> uid2sidMap = AtomicLongMap.create();
private ConsistentHashOfMemoryConsumerLoadBalancer() {
}
public static ConsistentHashOfMemoryConsumerLoadBalancer getInstance() {
return INSTANCE;
}
@Override
public Session loadBalancer(Object packet, Object argument) {
if (argument instanceof Long) {
long sid = uid2sidMap.get((Long) argument);
if (sid > 0L) {
Session memorySession = NetContext.getSessionManager().getClientSession(sid);
if (null != memorySession){
return memorySession;
}else {
uid2sidMap.remove((Long) argument);
}
}
}
Session loadBalancer = super.loadBalancer(packet, argument);
if (argument instanceof Long){
uid2sidMap.put((Long) argument, loadBalancer.getSid());
}
return loadBalancer;
}
public static Map<Long, Long> getUid2sidMap(){
return uid2sidMap.asMap();
}
}
@@ -16,6 +16,7 @@ package com.zfoo.net.handler;
import com.zfoo.event.manager.EventBus;
import com.zfoo.net.NetContext;
import com.zfoo.net.consumer.balancer.ConsistentHashConsumerLoadBalancer;
import com.zfoo.net.consumer.balancer.IConsumerLoadBalancer;
import com.zfoo.net.core.gateway.IGatewayLoadBalancer;
import com.zfoo.net.core.gateway.model.GatewaySessionInactiveEvent;
import com.zfoo.net.packet.DecodedPacketInfo;
@@ -26,6 +27,7 @@ import com.zfoo.net.router.attachment.GatewayAttachment;
import com.zfoo.net.router.attachment.SignalAttachment;
import com.zfoo.net.session.Session;
import com.zfoo.net.util.SessionUtils;
import com.zfoo.protocol.ProtocolManager;
import com.zfoo.protocol.util.JsonUtils;
import com.zfoo.protocol.util.StringUtils;
import com.zfoo.scheduler.util.TimeUtils;
@@ -114,7 +116,10 @@ public class GatewayRouteHandler extends ServerRouteHandler {
*/
private void forwardingPacket(Object packet, Object attachment, Object argument) {
try {
var consumerSession = ConsistentHashConsumerLoadBalancer.getInstance().loadBalancer(packet, argument);
// 网关统一用 moduleid uid 获取 session
var loadBalancer = NetContext.getConsumer().loadBalancer(ProtocolManager.moduleByProtocol(packet.getClass()));
Session consumerSession = loadBalancer.loadBalancer(packet, argument);
// var consumerSession = ConsistentHashConsumerLoadBalancer.getInstance().loadBalancer(packet, argument);
NetContext.getRouter().send(consumerSession, packet, attachment);
} catch (Exception e) {
logger.error("An exception occurred at the gateway", e);
+2 -2
View File
@@ -7,10 +7,10 @@
<groupId>com.zfoo</groupId>
<artifactId>orm</artifactId>
<packaging>jar</packaging>
<version>3.2.0</version>
<version>3.2.0.a</version>
<properties>
<zfoo.version>3.2.0</zfoo.version>
<zfoo.version>3.2.0.a</zfoo.version>
<!-- spring and spring boot -->
<spring.version>6.0.11</spring.version>
<spring.boot.version>3.1.3</spring.boot.version>
+2 -2
View File
@@ -7,7 +7,7 @@
<groupId>com.zfoo</groupId>
<artifactId>zfoo</artifactId>
<packaging>pom</packaging>
<version>3.2.0</version>
<version>3.2.0.a</version>
<name>zfoo</name>
@@ -126,7 +126,7 @@
</modules>
<properties>
<zfoo.version>3.2.0</zfoo.version>
<zfoo.version>3.2.0.a</zfoo.version>
<!-- spring and spring boot -->
<spring.version>6.0.11</spring.version>
<spring.boot.version>3.1.3</spring.boot.version>
+2 -2
View File
@@ -7,10 +7,10 @@
<groupId>com.zfoo</groupId>
<artifactId>protocol</artifactId>
<packaging>jar</packaging>
<version>3.2.0</version>
<version>3.2.0.a</version>
<properties>
<zfoo.version>3.2.0</zfoo.version>
<zfoo.version>3.2.0.a</zfoo.version>
<!-- spring and spring boot -->
<spring.version>6.0.11</spring.version>
<spring.boot.version>3.1.3</spring.boot.version>
+2 -2
View File
@@ -7,10 +7,10 @@
<groupId>com.zfoo</groupId>
<artifactId>scheduler</artifactId>
<packaging>jar</packaging>
<version>3.2.0</version>
<version>3.2.0.a</version>
<properties>
<zfoo.version>3.2.0</zfoo.version>
<zfoo.version>3.2.0.a</zfoo.version>
<!-- spring and spring boot -->
<spring.version>6.0.11</spring.version>
<spring.boot.version>3.1.3</spring.boot.version>
+2 -2
View File
@@ -7,10 +7,10 @@
<groupId>com.zfoo</groupId>
<artifactId>storage</artifactId>
<packaging>jar</packaging>
<version>3.2.0</version>
<version>3.2.0.a</version>
<properties>
<zfoo.version>3.2.0</zfoo.version>
<zfoo.version>3.2.0.a</zfoo.version>
<!-- spring and spring boot -->
<spring.version>6.0.11</spring.version>
<spring.boot.version>3.1.3</spring.boot.version>