Merge pull request #17 from meiwei666/main

perf[net]: 优化了comsumer消费方式选择
This commit is contained in:
jaysunxiao
2022-07-01 13:20:00 +08:00
committed by GitHub
12 changed files with 96 additions and 34 deletions
@@ -15,8 +15,10 @@ package com.zfoo.net.config.manager;
import com.zfoo.net.config.model.NetConfig;
import com.zfoo.net.consumer.balancer.AbstractConsumerLoadBalancer;
import com.zfoo.net.consumer.balancer.IConsumerLoadBalancer;
import com.zfoo.net.consumer.registry.IRegistry;
import com.zfoo.net.consumer.registry.ZookeeperRegistry;
import com.zfoo.net.session.model.Session;
import com.zfoo.protocol.ProtocolManager;
import com.zfoo.protocol.collection.CollectionUtils;
import com.zfoo.protocol.registration.ProtocolModule;
@@ -25,7 +27,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author jaysunxiao
@@ -42,6 +47,8 @@ public class ConfigManager implements IConfigManager {
private AbstractConsumerLoadBalancer consumerLoadBalancer;
private final Map<String, IConsumerLoadBalancer> consumerLoadBalancerMap = new ConcurrentHashMap<>();
/**
* 注册中心
*/
@@ -57,8 +64,8 @@ public class ConfigManager implements IConfigManager {
}
@Override
public AbstractConsumerLoadBalancer consumerLoadBalancer() {
return consumerLoadBalancer;
public IConsumerLoadBalancer consumerLoadBalancer(ProtocolModule module) {
return consumerLoadBalancerMap.get(module.getName());
}
@Override
@@ -84,10 +91,12 @@ public class ConfigManager implements IConfigManager {
var module = ProtocolManager.moduleByModuleName(providerModule.getName());
AssertionUtils.isTrue(module != null, "消费者[name:{}]在协议文件中不存在", providerModule.getName());
module.setGroup(providerModule.getGroup());
module.setLoadBalancer(providerModule.getLoadBalancer());
consumerModules.add(module);
consumerLoadBalancerMap.put(module.getName(), AbstractConsumerLoadBalancer.valueOf(module.getLoadBalancer()));
}
consumerConfig.setModules(consumerModules);
consumerLoadBalancer = AbstractConsumerLoadBalancer.valueOf(consumerConfig.getLoadBalancer());
// consumerLoadBalancer = AbstractConsumerLoadBalancer.valueOf(consumerConfig.getLoadBalancer());
}
registry = new ZookeeperRegistry();
@@ -15,7 +15,9 @@ package com.zfoo.net.config.manager;
import com.zfoo.net.config.model.NetConfig;
import com.zfoo.net.consumer.balancer.AbstractConsumerLoadBalancer;
import com.zfoo.net.consumer.balancer.IConsumerLoadBalancer;
import com.zfoo.net.consumer.registry.IRegistry;
import com.zfoo.protocol.registration.ProtocolModule;
/**
* @author jaysunxiao
@@ -25,7 +27,7 @@ public interface IConfigManager {
NetConfig getLocalConfig();
AbstractConsumerLoadBalancer consumerLoadBalancer();
IConsumerLoadBalancer consumerLoadBalancer(ProtocolModule module);
void initRegistry();
@@ -25,31 +25,14 @@ import java.util.Objects;
*/
public class ConsumerConfig {
private String loadBalancer;
private List<ProtocolModule> modules;
public static ConsumerConfig valueOf(String loadBalancer, List<ProtocolModule> modules) {
ConsumerConfig config = new ConsumerConfig();
config.loadBalancer = loadBalancer;
config.modules = modules;
return config;
}
public static ConsumerConfig valueOf(List<ProtocolModule> modules) {
ConsumerConfig config = new ConsumerConfig();
config.modules = modules;
return config;
}
public String getLoadBalancer() {
return loadBalancer;
}
public void setLoadBalancer(String loadBalancer) {
this.loadBalancer = loadBalancer;
}
public List<ProtocolModule> getModules() {
return modules;
}
@@ -25,6 +25,7 @@ import com.zfoo.net.router.exception.NetTimeOutException;
import com.zfoo.net.router.exception.UnexpectedProtocolException;
import com.zfoo.net.router.route.SignalBridge;
import com.zfoo.protocol.IPacket;
import com.zfoo.protocol.ProtocolManager;
import com.zfoo.protocol.util.JsonUtils;
import com.zfoo.protocol.util.StringUtils;
import com.zfoo.util.math.HashUtils;
@@ -51,7 +52,8 @@ public class Consumer implements IConsumer {
@Override
public void send(IPacket packet, Object argument) {
try {
var loadBalancer = NetContext.getConfigManager().consumerLoadBalancer();
var module = ProtocolManager.moduleByProtocolId(packet.protocolId());
var loadBalancer = NetContext.getConfigManager().consumerLoadBalancer(module);
var session = loadBalancer.loadBalancer(packet, argument);
var executorConsistentHash = (argument == null) ? RandomUtils.randomInt() : HashUtils.fnvHash(argument);
NetContext.getRouter().send(session, packet, NoAnswerAttachment.valueOf(executorConsistentHash));
@@ -62,7 +64,8 @@ public class Consumer implements IConsumer {
@Override
public <T extends IPacket> SyncAnswer<T> syncAsk(IPacket packet, Class<T> answerClass, Object argument) throws Exception {
var loadBalancer = NetContext.getConfigManager().consumerLoadBalancer();
var module = ProtocolManager.moduleByProtocolId(packet.protocolId());
var loadBalancer = NetContext.getConfigManager().consumerLoadBalancer(module);
var session = loadBalancer.loadBalancer(packet, argument);
@@ -103,7 +106,8 @@ public class Consumer implements IConsumer {
@Override
public <T extends IPacket> AsyncAnswer<T> asyncAsk(IPacket packet, Class<T> answerClass, Object argument) {
var loadBalancer = NetContext.getConfigManager().consumerLoadBalancer();
var module = ProtocolManager.moduleByProtocolId(packet.protocolId());
var loadBalancer = NetContext.getConfigManager().consumerLoadBalancer(module);
var session = loadBalancer.loadBalancer(packet, argument);
var asyncAnswer = NetContext.getRouter().asyncAsk(session, packet, answerClass, argument);
@@ -168,8 +168,6 @@ public class NetDefinitionParser implements BeanDefinitionParser {
var clazz = ConsumerConfig.class;
var builder = BeanDefinitionBuilder.rootBeanDefinition(clazz);
resolvePlaceholder("load-balancer", "loadBalancer", builder, element, parserContext);
var consumerModules = parseModules("consumer", element, parserContext);
builder.addPropertyValue("modules", consumerModules);
parserContext.getRegistry().registerBeanDefinition(clazz.getCanonicalName(), builder.getBeanDefinition());
@@ -185,6 +183,7 @@ public class NetDefinitionParser implements BeanDefinitionParser {
var builder = BeanDefinitionBuilder.rootBeanDefinition(clazz);
builder.addConstructorArgValue(environment.resolvePlaceholders(addressElement.getAttribute("name")));
builder.addConstructorArgValue(environment.resolvePlaceholders(addressElement.getAttribute("load-balancer")));
builder.addConstructorArgValue(environment.resolvePlaceholders(addressElement.getAttribute("group")));
modules.add(new BeanDefinitionHolder(builder.getBeanDefinition(), StringUtils.format("{}.{}{}", clazz.getCanonicalName(), param, i)));
+1 -1
View File
@@ -41,11 +41,11 @@
<xsd:sequence>
<xsd:element name="module" maxOccurs="unbounded" type="moduleAttributeType"/>
</xsd:sequence>
<xsd:attribute name="load-balancer" type="xsd:string" use="required"/>
</xsd:complexType>
<xsd:complexType name="moduleAttributeType">
<xsd:attribute name="name" type="xsd:string" use="required"/>
<xsd:attribute name="load-balancer" type="xsd:string" default="consistent-hash"/>
<xsd:attribute name="group" type="xsd:string" default="0"/>
</xsd:complexType>
@@ -149,5 +149,25 @@ public class ProviderTest {
ThreadUtils.sleep(Long.MAX_VALUE);
}
/**
* 固定消费方式
*/
@Test
public void startFixedConsumer() {
var context = new ClassPathXmlApplicationContext("provider/consumer_fixed_config.xml");
SessionUtils.printSessionInfo();
var ask = new ProviderMessAsk();
ask.setMessage("Hello, this is the consumer!");
var atomicInteger = new AtomicInteger(0);
for (int i = 0; i < 1000; i++) {
ThreadUtils.sleep(3000);
NetContext.getConsumer().asyncAsk(ask, ProviderMessAnswer.class, 0).whenComplete(answer -> {
logger.info("消费者请求[{}]收到消息[{}]", atomicInteger.incrementAndGet(), JsonUtils.object2String(answer));
});
}
ThreadUtils.sleep(Long.MAX_VALUE);
}
}
@@ -25,8 +25,8 @@
<net:address name="${registry.address.name}" url="${registry.address.url}"/>
</net:registry>
<net:consumer load-balancer="consistent-hash">
<net:module name="providerTest"/>
<net:consumer>
<net:module name="providerTest" load-balancer="consistent-hash"/>
</net:consumer>
</net:config>
@@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:net="http://www.zfoo.com/schema/net"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.zfoo.com/schema/net
http://www.zfoo.com/schema/net-1.0.xsd">
<context:property-placeholder location="classpath:deploy-dev.properties"/>
<context:component-scan base-package="com.zfoo"/>
<net:config id="applicationNameTest" protocol-location="protocol.xml">
<net:registry center="${registry.center}" user="${registry.user}" password="${registry.password}">
<net:address name="${registry.address.name}" url="${registry.address.url}"/>
</net:registry>
<net:consumer>
<net:module name="providerTest" load-balancer="fixed"/>
</net:consumer>
</net:config>
</beans>
@@ -24,8 +24,8 @@
<net:address name="${registry.address.name}" url="${registry.address.url}"/>
</net:registry>
<net:consumer load-balancer="random">
<net:module name="providerTest"/>
<net:consumer>
<net:module name="providerTest" load-balancer="random"/>
</net:consumer>
</net:config>
@@ -25,8 +25,8 @@
</net:registry>
<net:consumer load-balancer="shortest-time">
<net:module name="providerTest"/>
<net:consumer>
<net:module name="providerTest" load-balancer="shortest-time"/>
</net:consumer>
</net:config>
@@ -32,6 +32,8 @@ public class ProtocolModule {
*/
private int version;
private String loadBalancer;
private int group;
private transient int hash;
@@ -56,8 +58,9 @@ public class ProtocolModule {
this.perfectHash();
}
public ProtocolModule(String name, String group) {
public ProtocolModule(String name, String loadBalancer, String group) {
this.name = name;
this.loadBalancer = loadBalancer;
this.group = Integer.parseInt(group);
}
@@ -114,6 +117,14 @@ public class ProtocolModule {
this.version = version;
}
public String getLoadBalancer() {
return loadBalancer;
}
public void setLoadBalancer(String loadBalancer) {
this.loadBalancer = loadBalancer;
}
public int getGroup() {
return group;
}