From b2c0bd307301a0196a4f65f028bbbd69e4955436 Mon Sep 17 00:00:00 2001 From: jaysunxiao Date: Sat, 2 Jul 2022 10:50:03 +0800 Subject: [PATCH] =?UTF-8?q?perf[net]:=20=E9=87=8D=E6=9E=84net=EF=BC=8C?= =?UTF-8?q?=E6=8A=8A=E6=A8=A1=E5=9D=97=E5=8F=B7=E6=B7=BB=E5=8A=A0=E5=88=B0?= =?UTF-8?q?=E6=B3=A8=E5=86=8C=E5=88=97=E8=A1=A8=E4=B8=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/com/zfoo/net/NetContext.java | 1 + .../java/com/zfoo/net/consumer/Consumer.java | 34 +++++------ .../java/com/zfoo/net/consumer/IConsumer.java | 2 + .../AbstractConsumerLoadBalancer.java | 2 +- .../net/consumer/registry/RegisterVO.java | 14 ++--- .../consumer/registry/ZookeeperRegistry.java | 39 ++++++------- .../com/zfoo/net/config/RegistryTest.java | 58 +++++++++++++++++++ net/src/test/resources/protocol.xml | 10 ++-- 8 files changed, 106 insertions(+), 54 deletions(-) create mode 100644 net/src/test/java/com/zfoo/net/config/RegistryTest.java diff --git a/net/src/main/java/com/zfoo/net/NetContext.java b/net/src/main/java/com/zfoo/net/NetContext.java index 58224eaa..5fdfa130 100644 --- a/net/src/main/java/com/zfoo/net/NetContext.java +++ b/net/src/main/java/com/zfoo/net/NetContext.java @@ -107,6 +107,7 @@ public class NetContext implements ApplicationListener, instance.packetService.init(); instance.configManager.initRegistry(); + instance.consumer.init(); logger.info("Net started successfully and cost [{}] seconds", stopWatch.costSeconds()); } else if (event instanceof ContextClosedEvent) { diff --git a/net/src/main/java/com/zfoo/net/consumer/Consumer.java b/net/src/main/java/com/zfoo/net/consumer/Consumer.java index 4cba81d3..172fc447 100644 --- a/net/src/main/java/com/zfoo/net/consumer/Consumer.java +++ b/net/src/main/java/com/zfoo/net/consumer/Consumer.java @@ -16,7 +16,6 @@ package com.zfoo.net.consumer; import com.zfoo.net.NetContext; import com.zfoo.net.consumer.balancer.AbstractConsumerLoadBalancer; import com.zfoo.net.consumer.balancer.IConsumerLoadBalancer; -import com.zfoo.net.consumer.registry.RegisterVO; import com.zfoo.net.packet.common.Error; import com.zfoo.net.router.Router; import com.zfoo.net.router.answer.AsyncAnswer; @@ -27,12 +26,9 @@ import com.zfoo.net.router.exception.ErrorResponseException; import com.zfoo.net.router.exception.NetTimeOutException; import com.zfoo.net.router.exception.UnexpectedProtocolException; import com.zfoo.net.router.route.SignalBridge; -import com.zfoo.net.session.model.AttributeType; -import com.zfoo.net.session.model.Session; import com.zfoo.protocol.IPacket; import com.zfoo.protocol.ProtocolManager; import com.zfoo.protocol.collection.CollectionUtils; -import com.zfoo.protocol.exception.RunException; import com.zfoo.protocol.registration.ProtocolModule; import com.zfoo.protocol.util.JsonUtils; import com.zfoo.protocol.util.StringUtils; @@ -41,8 +37,8 @@ import com.zfoo.util.math.RandomUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -58,27 +54,29 @@ public class Consumer implements IConsumer { private static final Logger logger = LoggerFactory.getLogger(Consumer.class); + private final Map consumerLoadBalancerMap = new HashMap<>(); + @Override - public IConsumerLoadBalancer loadBalancer(ProtocolModule protocolModule) { + public void init() { var consumerConfig = NetContext.getConfigManager().getLocalConfig().getConsumer(); if (consumerConfig == null || CollectionUtils.isEmpty(consumerConfig.getConsumers())) { - throw new RunException("没有配置服务消费者,无法消费"); + return; } - var consumers = consumerConfig.getConsumers(); for (var consumer : consumers) { - if (consumer.getProtocolModule().equals(protocolModule)) { - return AbstractConsumerLoadBalancer.valueOf(consumer.getLoadBalancer()); - } + consumerLoadBalancerMap.put(consumer.getProtocolModule(), AbstractConsumerLoadBalancer.valueOf(consumer.getLoadBalancer())); } - return null; + } + + @Override + public IConsumerLoadBalancer loadBalancer(ProtocolModule protocolModule) { + return consumerLoadBalancerMap.get(protocolModule); } @Override public void send(IPacket packet, Object argument) { try { - var module = ProtocolManager.moduleByProtocolId(packet.protocolId()); - var loadBalancer = loadBalancer(module); + var loadBalancer = loadBalancer(ProtocolManager.moduleByProtocolId(packet.protocolId())); var session = loadBalancer.loadBalancer(packet, argument); var executorConsistentHash = (argument == null) ? RandomUtils.randomInt() : HashUtils.fnvHash(argument); NetContext.getRouter().send(session, packet, NoAnswerAttachment.valueOf(executorConsistentHash)); @@ -89,8 +87,7 @@ public class Consumer implements IConsumer { @Override public SyncAnswer syncAsk(IPacket packet, Class answerClass, Object argument) throws Exception { - var module = ProtocolManager.moduleByProtocolId(packet.protocolId()); - var loadBalancer = loadBalancer(module); + var loadBalancer = loadBalancer(ProtocolManager.moduleByProtocolId(packet.protocolId())); var session = loadBalancer.loadBalancer(packet, argument); @@ -131,8 +128,7 @@ public class Consumer implements IConsumer { @Override public AsyncAnswer asyncAsk(IPacket packet, Class answerClass, Object argument) { - var module = ProtocolManager.moduleByProtocolId(packet.protocolId()); - var loadBalancer = loadBalancer(module); + var loadBalancer = loadBalancer(ProtocolManager.moduleByProtocolId(packet.protocolId())); var session = loadBalancer.loadBalancer(packet, argument); var asyncAnswer = NetContext.getRouter().asyncAsk(session, packet, answerClass, argument); diff --git a/net/src/main/java/com/zfoo/net/consumer/IConsumer.java b/net/src/main/java/com/zfoo/net/consumer/IConsumer.java index d1449963..eb2c5cb7 100644 --- a/net/src/main/java/com/zfoo/net/consumer/IConsumer.java +++ b/net/src/main/java/com/zfoo/net/consumer/IConsumer.java @@ -26,6 +26,8 @@ import org.springframework.lang.Nullable; */ public interface IConsumer { + void init(); + IConsumerLoadBalancer loadBalancer(ProtocolModule protocolModule); /** diff --git a/net/src/main/java/com/zfoo/net/consumer/balancer/AbstractConsumerLoadBalancer.java b/net/src/main/java/com/zfoo/net/consumer/balancer/AbstractConsumerLoadBalancer.java index 6c0319bf..6ee263e3 100644 --- a/net/src/main/java/com/zfoo/net/consumer/balancer/AbstractConsumerLoadBalancer.java +++ b/net/src/main/java/com/zfoo/net/consumer/balancer/AbstractConsumerLoadBalancer.java @@ -63,7 +63,7 @@ public abstract class AbstractConsumerLoadBalancer implements IConsumerLoadBalan var attribute = it.getAttribute(AttributeType.CONSUMER); if (Objects.nonNull(attribute)) { var registerVO = (RegisterVO) attribute; - return Objects.nonNull(registerVO.getProviderConfig()) && registerVO.getProviderConfig().getProviders().stream().anyMatch(provider -> provider.getProtocolModule().getId() == module.getId()); + return Objects.nonNull(registerVO.getProviderConfig()) && registerVO.getProviderConfig().getProviders().stream().anyMatch(provider -> provider.getProtocolModule().equals(module)); } else { return false; } diff --git a/net/src/main/java/com/zfoo/net/consumer/registry/RegisterVO.java b/net/src/main/java/com/zfoo/net/consumer/registry/RegisterVO.java index 16f1d183..4b64bfbc 100644 --- a/net/src/main/java/com/zfoo/net/consumer/registry/RegisterVO.java +++ b/net/src/main/java/com/zfoo/net/consumer/registry/RegisterVO.java @@ -17,9 +17,9 @@ import com.zfoo.net.config.model.ConsumerConfig; import com.zfoo.net.config.model.ConsumerModule; import com.zfoo.net.config.model.ProviderConfig; import com.zfoo.net.config.model.ProviderModule; -import com.zfoo.protocol.ProtocolManager; import com.zfoo.protocol.collection.CollectionUtils; import com.zfoo.protocol.exception.ExceptionUtils; +import com.zfoo.protocol.registration.ProtocolModule; import com.zfoo.protocol.util.StringUtils; import com.zfoo.util.security.IdUtils; import org.slf4j.Logger; @@ -46,13 +46,13 @@ public class RegisterVO { private ConsumerConfig consumerConfig; - public static boolean providerHasConsumerModule(RegisterVO providerVO, RegisterVO consumerVO) { + public static boolean providerHasConsumer(RegisterVO providerVO, RegisterVO consumerVO) { if (Objects.isNull(providerVO) || Objects.isNull(providerVO.providerConfig) || CollectionUtils.isEmpty(providerVO.providerConfig.getProviders()) || Objects.isNull(consumerVO) || Objects.isNull(consumerVO.consumerConfig) || CollectionUtils.isEmpty(consumerVO.consumerConfig.getConsumers())) { return false; } for (var provider : providerVO.getProviderConfig().getProviders()) { - if (consumerVO.getConsumerConfig().getConsumers().stream().anyMatch(it -> it.getConsumer().equals(provider.getProvider()))) { + if (consumerVO.getConsumerConfig().getConsumers().stream().anyMatch(it -> provider.getProvider().equals(it.getConsumer()))) { return true; } } @@ -106,7 +106,7 @@ public class RegisterVO { var modules = Arrays.stream(moduleSplits) .map(it -> it.trim()) .map(it -> it.split(StringUtils.HYPHEN)) - .map(it -> new ProviderModule(ProtocolManager.moduleByModuleName(StringUtils.trim(it[0])), StringUtils.trim(it[1]))) + .map(it -> new ProviderModule(new ProtocolModule(Byte.parseByte(it[0]), it[1]), it[2])) .collect(Collectors.toList()); return modules; } @@ -119,7 +119,7 @@ public class RegisterVO { var modules = Arrays.stream(moduleSplits) .map(it -> it.trim()) .map(it -> it.split(StringUtils.HYPHEN)) - .map(it -> new ConsumerModule(ProtocolManager.moduleByModuleName(StringUtils.trim(it[0])), StringUtils.trim(it[1]), StringUtils.trim(it[2]))) + .map(it -> new ConsumerModule(new ProtocolModule(Byte.parseByte(it[0]), it[1]), it[2], it[3])) .collect(Collectors.toList()); return modules; } @@ -149,7 +149,7 @@ public class RegisterVO { builder.append(StringUtils.SPACE).append(StringUtils.VERTICAL_BAR).append(StringUtils.SPACE); var providerModules = providerConfig.getProviders().stream() - .map(it -> StringUtils.joinWith(StringUtils.HYPHEN, it.getProtocolModule().getName(), it.getProvider())) + .map(it -> StringUtils.joinWith(StringUtils.HYPHEN, it.getProtocolModule().getId(), it.getProtocolModule().getName(), it.getProvider())) .collect(Collectors.toList()); builder.append(StringUtils.format("provider:[{}]" , StringUtils.joinWith(StringUtils.COMMA + StringUtils.SPACE, providerModules.toArray()))); @@ -159,7 +159,7 @@ public class RegisterVO { builder.append(StringUtils.SPACE).append(StringUtils.VERTICAL_BAR).append(StringUtils.SPACE); var consumerModules = consumerConfig.getConsumers().stream() - .map(it -> StringUtils.joinWith(StringUtils.HYPHEN, it.getProtocolModule().getName(), it.getLoadBalancer(), it.getConsumer())) + .map(it -> StringUtils.joinWith(StringUtils.HYPHEN, it.getProtocolModule().getId(), it.getProtocolModule().getName(), it.getLoadBalancer(), it.getConsumer())) .collect(Collectors.toList()); builder.append(StringUtils.format("consumer:[{}]" , StringUtils.joinWith(StringUtils.COMMA + StringUtils.SPACE, consumerModules.toArray()))); diff --git a/net/src/main/java/com/zfoo/net/consumer/registry/ZookeeperRegistry.java b/net/src/main/java/com/zfoo/net/consumer/registry/ZookeeperRegistry.java index 456f8eda..c1474c79 100644 --- a/net/src/main/java/com/zfoo/net/consumer/registry/ZookeeperRegistry.java +++ b/net/src/main/java/com/zfoo/net/consumer/registry/ZookeeperRegistry.java @@ -112,13 +112,7 @@ public class ZookeeperRegistry implements IRegistry { /** * consumer需要消费的provider集合 */ - private final Set providerCacheSet = new ConcurrentHashSet<>(); - /** - * 本地注册信息 - */ - private final RegisterVO localRegisterVO = NetContext.getConfigManager().getLocalConfig().toLocalRegisterVO(); - - + private final Set providerHashConsumerSet = new ConcurrentHashSet<>(); /** * addListener中的cache全部会被添加到这个集合中,这个集合不包括providerCuratorCache */ @@ -282,8 +276,9 @@ public class ZookeeperRegistry implements IRegistry { case NODE_CREATED: var providerStr = StringUtils.substringAfterFirst(newData.getPath(), PROVIDER_ROOT_PATH + StringUtils.SLASH); var provider = RegisterVO.parseString(providerStr); - if (RegisterVO.providerHasConsumerModule(provider, localRegisterVO)) { - providerCacheSet.add(provider); + var localRegisterVO = NetContext.getConfigManager().getLocalConfig().toLocalRegisterVO(); + if (RegisterVO.providerHasConsumer(provider, localRegisterVO)) { + providerHashConsumerSet.add(provider); checkConsumer(); logger.info("发现新的订阅服务[{}]", providerStr); } @@ -291,8 +286,8 @@ public class ZookeeperRegistry implements IRegistry { case NODE_DELETED: var oldProviderStr = StringUtils.substringAfterFirst(oldData.getPath(), PROVIDER_ROOT_PATH + StringUtils.SLASH); var oldProvider = RegisterVO.parseString(oldProviderStr); - if (providerCacheSet.contains(oldProvider)) { - providerCacheSet.remove(oldProvider); + if (providerHashConsumerSet.contains(oldProvider)) { + providerHashConsumerSet.remove(oldProvider); checkConsumer(); logger.info("取消订阅服务[{}]", oldProviderStr); } @@ -329,6 +324,7 @@ public class ZookeeperRegistry implements IRegistry { } private void initLocalProvider() throws Exception { + var localRegisterVO = NetContext.getConfigManager().getLocalConfig().toLocalRegisterVO(); if (Objects.nonNull(localRegisterVO.getProviderConfig())) { var localProviderVoStr = localRegisterVO.toProviderString(); var localProviderPath = PROVIDER_ROOT_PATH + StringUtils.SLASH + localProviderVoStr; @@ -357,16 +353,17 @@ public class ZookeeperRegistry implements IRegistry { } private void initConsumerCache() throws Exception { + var localRegisterVO = NetContext.getConfigManager().getLocalConfig().toLocalRegisterVO(); // 初始化providerCacheSet var remoteProviderSet = curator.getChildren().forPath(PROVIDER_ROOT_PATH).stream() .filter(it -> StringUtils.isNotBlank(it) && !"null".equals(it)) .map(it -> RegisterVO.parseString(it)) .filter(it -> Objects.nonNull(it)) - .filter(it -> RegisterVO.providerHasConsumerModule(it, localRegisterVO)) + .filter(it -> RegisterVO.providerHasConsumer(it, localRegisterVO)) .collect(Collectors.toSet()); - providerCacheSet.clear(); - providerCacheSet.addAll(remoteProviderSet); + providerHashConsumerSet.clear(); + providerHashConsumerSet.addAll(remoteProviderSet); // 初始化consumer,providerCacheSet改变会导致消费者改变 checkConsumer(); @@ -391,11 +388,12 @@ public class ZookeeperRegistry implements IRegistry { return; } - logger.info("开始通过providerCacheSet:{}检查[consumer:{}]", providerCacheSet, NetContext.getSessionManager().getClientSessionMap().size()); + logger.info("开始通过providerHashConsumerSet:{}检查[consumer:{}]", providerHashConsumerSet, NetContext.getSessionManager().getClientSessionMap().size()); var recheckFlag = false; - for (var providerCache : providerCacheSet) { + for (var providerCache : providerHashConsumerSet) { + // 先排除已经启动的consumer var consumerClientList = NetContext.getSessionManager().getClientSessionMap().values().stream() .filter(it -> { var attribute = it.getAttribute(AttributeType.CONSUMER); @@ -428,6 +426,7 @@ public class ZookeeperRegistry implements IRegistry { EventBus.asyncSubmit(ConsumerStartEvent.valueOf(providerCache, session)); try { + var localRegisterVO = NetContext.getConfigManager().getLocalConfig().toLocalRegisterVO(); var path = CONSUMER_ROOT_PATH + StringUtils.SLASH + localRegisterVO.toConsumerString(); var stat = curator.checkExists().forPath(path); if (Objects.isNull(stat)) { @@ -446,12 +445,7 @@ public class ZookeeperRegistry implements IRegistry { } if (recheckFlag) { - SchedulerBus.schedule(new Runnable() { - @Override - public void run() { - checkConsumer(); - } - }, RETRY_SECONDS, TimeUnit.SECONDS); + SchedulerBus.schedule(() -> checkConsumer(), RETRY_SECONDS, TimeUnit.SECONDS); } } @@ -582,6 +576,7 @@ public class ZookeeperRegistry implements IRegistry { return; } try { + var localRegisterVO = NetContext.getConfigManager().getLocalConfig().toLocalRegisterVO(); if (curator.getState() == CuratorFrameworkState.STARTED) { // 删除服务提供者的临时节点 if (Objects.nonNull(localRegisterVO.getProviderConfig())) { diff --git a/net/src/test/java/com/zfoo/net/config/RegistryTest.java b/net/src/test/java/com/zfoo/net/config/RegistryTest.java new file mode 100644 index 00000000..4efd5e0d --- /dev/null +++ b/net/src/test/java/com/zfoo/net/config/RegistryTest.java @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2020 The zfoo Authors + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package com.zfoo.net.config; + +import com.zfoo.net.config.model.ConsumerConfig; +import com.zfoo.net.config.model.ConsumerModule; +import com.zfoo.net.config.model.ProviderConfig; +import com.zfoo.net.config.model.ProviderModule; +import com.zfoo.net.consumer.registry.RegisterVO; +import com.zfoo.protocol.registration.ProtocolModule; +import com.zfoo.util.net.HostAndPort; +import io.netty.util.NetUtil; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +/** + * @author jaysunxiao + * @version 3.0 + */ +public class RegistryTest { + + @Test + public void registerVoTest() { + var protocolModule1 = new ProtocolModule((byte) 100, "aaa"); + var protocolModule2 = new ProtocolModule((byte) 120, "bbb"); + var providerModules = List.of(new ProviderModule(protocolModule1, "a"), new ProviderModule(protocolModule2, "b")); + var providerConfig = ProviderConfig.valueOf(HostAndPort.valueOf("127.0.0.1", 80).toHostAndPortStr(), providerModules); + + var consumerModules = List.of(new ConsumerModule(protocolModule1, "random", "a"), new ConsumerModule(protocolModule2, "random", "b")); + var consumerConfig = ConsumerConfig.valueOf(consumerModules); + + var vo = RegisterVO.valueOf("test", providerConfig, consumerConfig); + var voStr = vo.toString(); + System.out.println(voStr); + var newVo = RegisterVO.parseString(voStr); + Assert.assertEquals(vo, newVo); + + System.out.println(NetUtil.LOCALHOST); + System.out.println(NetUtil.LOCALHOST4); + System.out.println(NetUtil.LOCALHOST6); + System.out.println(NetUtil.SOMAXCONN); + System.out.println(NetUtil.LOOPBACK_IF); + } + +} diff --git a/net/src/test/resources/protocol.xml b/net/src/test/resources/protocol.xml index 374c24c6..be92981a 100644 --- a/net/src/test/resources/protocol.xml +++ b/net/src/test/resources/protocol.xml @@ -3,7 +3,7 @@ - + @@ -19,7 +19,7 @@ - + @@ -35,7 +35,7 @@ - + @@ -72,13 +72,13 @@ - + - +