From a7741f0fc2a612ae5984e4282270f9bf41fa9bf3 Mon Sep 17 00:00:00 2001 From: meiwei <309921330@qq.com> Date: Fri, 1 Jul 2022 12:12:48 +0800 Subject: [PATCH] =?UTF-8?q?perf[net]:=20=E4=BC=98=E5=8C=96=E4=BA=86comsume?= =?UTF-8?q?r=E6=B6=88=E8=B4=B9=E6=96=B9=E5=BC=8F=E9=80=89=E6=8B=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../net/config/manager/ConfigManager.java | 15 ++++++-- .../net/config/manager/IConfigManager.java | 4 ++- .../zfoo/net/config/model/ConsumerConfig.java | 17 ---------- .../java/com/zfoo/net/consumer/Consumer.java | 10 ++++-- .../zfoo/net/schema/NetDefinitionParser.java | 3 +- net/src/main/resources/net-1.0.xsd | 2 +- .../zfoo/net/core/provider/ProviderTest.java | 20 +++++++++++ .../consumer_consistent_session_config.xml | 4 +-- .../provider/consumer_fixed_config.xml | 34 +++++++++++++++++++ .../provider/consumer_random_config.xml | 4 +-- .../consumer_shortest_time_config.xml | 4 +-- .../protocol/registration/ProtocolModule.java | 13 ++++++- 12 files changed, 96 insertions(+), 34 deletions(-) create mode 100644 net/src/test/resources/provider/consumer_fixed_config.xml diff --git a/net/src/main/java/com/zfoo/net/config/manager/ConfigManager.java b/net/src/main/java/com/zfoo/net/config/manager/ConfigManager.java index 24139ff2..44070411 100644 --- a/net/src/main/java/com/zfoo/net/config/manager/ConfigManager.java +++ b/net/src/main/java/com/zfoo/net/config/manager/ConfigManager.java @@ -15,8 +15,10 @@ package com.zfoo.net.config.manager; import com.zfoo.net.config.model.NetConfig; import com.zfoo.net.consumer.balancer.AbstractConsumerLoadBalancer; +import com.zfoo.net.consumer.balancer.IConsumerLoadBalancer; import com.zfoo.net.consumer.registry.IRegistry; import com.zfoo.net.consumer.registry.ZookeeperRegistry; +import com.zfoo.net.session.model.Session; import com.zfoo.protocol.ProtocolManager; import com.zfoo.protocol.collection.CollectionUtils; import com.zfoo.protocol.registration.ProtocolModule; @@ -25,7 +27,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; /** * @author jaysunxiao @@ -42,6 +47,8 @@ public class ConfigManager implements IConfigManager { private AbstractConsumerLoadBalancer consumerLoadBalancer; + private final Map consumerLoadBalancerMap = new ConcurrentHashMap<>(); + /** * 注册中心 */ @@ -57,8 +64,8 @@ public class ConfigManager implements IConfigManager { } @Override - public AbstractConsumerLoadBalancer consumerLoadBalancer() { - return consumerLoadBalancer; + public IConsumerLoadBalancer consumerLoadBalancer(ProtocolModule module) { + return consumerLoadBalancerMap.get(module.getName()); } @Override @@ -84,10 +91,12 @@ public class ConfigManager implements IConfigManager { var module = ProtocolManager.moduleByModuleName(providerModule.getName()); AssertionUtils.isTrue(module != null, "消费者[name:{}]在协议文件中不存在", providerModule.getName()); module.setGroup(providerModule.getGroup()); + module.setLoadBalancer(providerModule.getLoadBalancer()); consumerModules.add(module); + consumerLoadBalancerMap.put(module.getName(), AbstractConsumerLoadBalancer.valueOf(module.getLoadBalancer())); } consumerConfig.setModules(consumerModules); - consumerLoadBalancer = AbstractConsumerLoadBalancer.valueOf(consumerConfig.getLoadBalancer()); +// consumerLoadBalancer = AbstractConsumerLoadBalancer.valueOf(consumerConfig.getLoadBalancer()); } registry = new ZookeeperRegistry(); diff --git a/net/src/main/java/com/zfoo/net/config/manager/IConfigManager.java b/net/src/main/java/com/zfoo/net/config/manager/IConfigManager.java index 48a8e713..256c168e 100644 --- a/net/src/main/java/com/zfoo/net/config/manager/IConfigManager.java +++ b/net/src/main/java/com/zfoo/net/config/manager/IConfigManager.java @@ -15,7 +15,9 @@ package com.zfoo.net.config.manager; import com.zfoo.net.config.model.NetConfig; import com.zfoo.net.consumer.balancer.AbstractConsumerLoadBalancer; +import com.zfoo.net.consumer.balancer.IConsumerLoadBalancer; import com.zfoo.net.consumer.registry.IRegistry; +import com.zfoo.protocol.registration.ProtocolModule; /** * @author jaysunxiao @@ -25,7 +27,7 @@ public interface IConfigManager { NetConfig getLocalConfig(); - AbstractConsumerLoadBalancer consumerLoadBalancer(); + IConsumerLoadBalancer consumerLoadBalancer(ProtocolModule module); void initRegistry(); diff --git a/net/src/main/java/com/zfoo/net/config/model/ConsumerConfig.java b/net/src/main/java/com/zfoo/net/config/model/ConsumerConfig.java index 6407c7a3..a5dfd33a 100644 --- a/net/src/main/java/com/zfoo/net/config/model/ConsumerConfig.java +++ b/net/src/main/java/com/zfoo/net/config/model/ConsumerConfig.java @@ -25,31 +25,14 @@ import java.util.Objects; */ public class ConsumerConfig { - private String loadBalancer; - private List modules; - public static ConsumerConfig valueOf(String loadBalancer, List modules) { - ConsumerConfig config = new ConsumerConfig(); - config.loadBalancer = loadBalancer; - config.modules = modules; - return config; - } - public static ConsumerConfig valueOf(List modules) { ConsumerConfig config = new ConsumerConfig(); config.modules = modules; return config; } - public String getLoadBalancer() { - return loadBalancer; - } - - public void setLoadBalancer(String loadBalancer) { - this.loadBalancer = loadBalancer; - } - public List getModules() { return modules; } diff --git a/net/src/main/java/com/zfoo/net/consumer/Consumer.java b/net/src/main/java/com/zfoo/net/consumer/Consumer.java index aacc7987..68f9c0a5 100644 --- a/net/src/main/java/com/zfoo/net/consumer/Consumer.java +++ b/net/src/main/java/com/zfoo/net/consumer/Consumer.java @@ -25,6 +25,7 @@ import com.zfoo.net.router.exception.NetTimeOutException; import com.zfoo.net.router.exception.UnexpectedProtocolException; import com.zfoo.net.router.route.SignalBridge; import com.zfoo.protocol.IPacket; +import com.zfoo.protocol.ProtocolManager; import com.zfoo.protocol.util.JsonUtils; import com.zfoo.protocol.util.StringUtils; import com.zfoo.util.math.HashUtils; @@ -51,7 +52,8 @@ public class Consumer implements IConsumer { @Override public void send(IPacket packet, Object argument) { try { - var loadBalancer = NetContext.getConfigManager().consumerLoadBalancer(); + var module = ProtocolManager.moduleByProtocolId(packet.protocolId()); + var loadBalancer = NetContext.getConfigManager().consumerLoadBalancer(module); var session = loadBalancer.loadBalancer(packet, argument); var executorConsistentHash = (argument == null) ? RandomUtils.randomInt() : HashUtils.fnvHash(argument); NetContext.getRouter().send(session, packet, NoAnswerAttachment.valueOf(executorConsistentHash)); @@ -62,7 +64,8 @@ public class Consumer implements IConsumer { @Override public SyncAnswer syncAsk(IPacket packet, Class answerClass, Object argument) throws Exception { - var loadBalancer = NetContext.getConfigManager().consumerLoadBalancer(); + var module = ProtocolManager.moduleByProtocolId(packet.protocolId()); + var loadBalancer = NetContext.getConfigManager().consumerLoadBalancer(module); var session = loadBalancer.loadBalancer(packet, argument); @@ -103,7 +106,8 @@ public class Consumer implements IConsumer { @Override public AsyncAnswer asyncAsk(IPacket packet, Class answerClass, Object argument) { - var loadBalancer = NetContext.getConfigManager().consumerLoadBalancer(); + var module = ProtocolManager.moduleByProtocolId(packet.protocolId()); + var loadBalancer = NetContext.getConfigManager().consumerLoadBalancer(module); var session = loadBalancer.loadBalancer(packet, argument); var asyncAnswer = NetContext.getRouter().asyncAsk(session, packet, answerClass, argument); diff --git a/net/src/main/java/com/zfoo/net/schema/NetDefinitionParser.java b/net/src/main/java/com/zfoo/net/schema/NetDefinitionParser.java index 2ae74a03..41d89649 100644 --- a/net/src/main/java/com/zfoo/net/schema/NetDefinitionParser.java +++ b/net/src/main/java/com/zfoo/net/schema/NetDefinitionParser.java @@ -168,8 +168,6 @@ public class NetDefinitionParser implements BeanDefinitionParser { var clazz = ConsumerConfig.class; var builder = BeanDefinitionBuilder.rootBeanDefinition(clazz); - resolvePlaceholder("load-balancer", "loadBalancer", builder, element, parserContext); - var consumerModules = parseModules("consumer", element, parserContext); builder.addPropertyValue("modules", consumerModules); parserContext.getRegistry().registerBeanDefinition(clazz.getCanonicalName(), builder.getBeanDefinition()); @@ -185,6 +183,7 @@ public class NetDefinitionParser implements BeanDefinitionParser { var builder = BeanDefinitionBuilder.rootBeanDefinition(clazz); builder.addConstructorArgValue(environment.resolvePlaceholders(addressElement.getAttribute("name"))); + builder.addConstructorArgValue(environment.resolvePlaceholders(addressElement.getAttribute("load-balancer"))); builder.addConstructorArgValue(environment.resolvePlaceholders(addressElement.getAttribute("group"))); modules.add(new BeanDefinitionHolder(builder.getBeanDefinition(), StringUtils.format("{}.{}{}", clazz.getCanonicalName(), param, i))); diff --git a/net/src/main/resources/net-1.0.xsd b/net/src/main/resources/net-1.0.xsd index 6bded9d0..857c9ca9 100644 --- a/net/src/main/resources/net-1.0.xsd +++ b/net/src/main/resources/net-1.0.xsd @@ -41,11 +41,11 @@ - + diff --git a/net/src/test/java/com/zfoo/net/core/provider/ProviderTest.java b/net/src/test/java/com/zfoo/net/core/provider/ProviderTest.java index 5f09dffd..eb40145c 100644 --- a/net/src/test/java/com/zfoo/net/core/provider/ProviderTest.java +++ b/net/src/test/java/com/zfoo/net/core/provider/ProviderTest.java @@ -149,5 +149,25 @@ public class ProviderTest { ThreadUtils.sleep(Long.MAX_VALUE); } + /** + * 固定消费方式 + */ + @Test + public void startFixedConsumer() { + var context = new ClassPathXmlApplicationContext("provider/consumer_fixed_config.xml"); + SessionUtils.printSessionInfo(); + var ask = new ProviderMessAsk(); + ask.setMessage("Hello, this is the consumer!"); + var atomicInteger = new AtomicInteger(0); + + for (int i = 0; i < 1000; i++) { + ThreadUtils.sleep(3000); + NetContext.getConsumer().asyncAsk(ask, ProviderMessAnswer.class, 0).whenComplete(answer -> { + logger.info("消费者请求[{}]收到消息[{}]", atomicInteger.incrementAndGet(), JsonUtils.object2String(answer)); + }); + } + + ThreadUtils.sleep(Long.MAX_VALUE); + } } diff --git a/net/src/test/resources/provider/consumer_consistent_session_config.xml b/net/src/test/resources/provider/consumer_consistent_session_config.xml index ac20d749..55c7cdfa 100644 --- a/net/src/test/resources/provider/consumer_consistent_session_config.xml +++ b/net/src/test/resources/provider/consumer_consistent_session_config.xml @@ -25,8 +25,8 @@ - - + + diff --git a/net/src/test/resources/provider/consumer_fixed_config.xml b/net/src/test/resources/provider/consumer_fixed_config.xml new file mode 100644 index 00000000..4395ad7e --- /dev/null +++ b/net/src/test/resources/provider/consumer_fixed_config.xml @@ -0,0 +1,34 @@ + + + + + + + + + + + + + + + + + + + + + diff --git a/net/src/test/resources/provider/consumer_random_config.xml b/net/src/test/resources/provider/consumer_random_config.xml index 680fbcbf..956f5d47 100644 --- a/net/src/test/resources/provider/consumer_random_config.xml +++ b/net/src/test/resources/provider/consumer_random_config.xml @@ -24,8 +24,8 @@ - - + + diff --git a/net/src/test/resources/provider/consumer_shortest_time_config.xml b/net/src/test/resources/provider/consumer_shortest_time_config.xml index 64d3dc29..637355a5 100644 --- a/net/src/test/resources/provider/consumer_shortest_time_config.xml +++ b/net/src/test/resources/provider/consumer_shortest_time_config.xml @@ -25,8 +25,8 @@ - - + + diff --git a/protocol/src/main/java/com/zfoo/protocol/registration/ProtocolModule.java b/protocol/src/main/java/com/zfoo/protocol/registration/ProtocolModule.java index 311e4693..d2b7fd74 100644 --- a/protocol/src/main/java/com/zfoo/protocol/registration/ProtocolModule.java +++ b/protocol/src/main/java/com/zfoo/protocol/registration/ProtocolModule.java @@ -32,6 +32,8 @@ public class ProtocolModule { */ private int version; + private String loadBalancer; + private int group; private transient int hash; @@ -56,8 +58,9 @@ public class ProtocolModule { this.perfectHash(); } - public ProtocolModule(String name, String group) { + public ProtocolModule(String name, String loadBalancer, String group) { this.name = name; + this.loadBalancer = loadBalancer; this.group = Integer.parseInt(group); } @@ -114,6 +117,14 @@ public class ProtocolModule { this.version = version; } + public String getLoadBalancer() { + return loadBalancer; + } + + public void setLoadBalancer(String loadBalancer) { + this.loadBalancer = loadBalancer; + } + public int getGroup() { return group; }