perf[net]: 重构net,把模块号添加到注册列表中

This commit is contained in:
jaysunxiao
2022-07-02 10:50:03 +08:00
parent ef66cfbc3a
commit b2c0bd3073
8 changed files with 106 additions and 54 deletions
@@ -107,6 +107,7 @@ public class NetContext implements ApplicationListener<ApplicationContextEvent>,
instance.packetService.init();
instance.configManager.initRegistry();
instance.consumer.init();
logger.info("Net started successfully and cost [{}] seconds", stopWatch.costSeconds());
} else if (event instanceof ContextClosedEvent) {
@@ -16,7 +16,6 @@ package com.zfoo.net.consumer;
import com.zfoo.net.NetContext;
import com.zfoo.net.consumer.balancer.AbstractConsumerLoadBalancer;
import com.zfoo.net.consumer.balancer.IConsumerLoadBalancer;
import com.zfoo.net.consumer.registry.RegisterVO;
import com.zfoo.net.packet.common.Error;
import com.zfoo.net.router.Router;
import com.zfoo.net.router.answer.AsyncAnswer;
@@ -27,12 +26,9 @@ import com.zfoo.net.router.exception.ErrorResponseException;
import com.zfoo.net.router.exception.NetTimeOutException;
import com.zfoo.net.router.exception.UnexpectedProtocolException;
import com.zfoo.net.router.route.SignalBridge;
import com.zfoo.net.session.model.AttributeType;
import com.zfoo.net.session.model.Session;
import com.zfoo.protocol.IPacket;
import com.zfoo.protocol.ProtocolManager;
import com.zfoo.protocol.collection.CollectionUtils;
import com.zfoo.protocol.exception.RunException;
import com.zfoo.protocol.registration.ProtocolModule;
import com.zfoo.protocol.util.JsonUtils;
import com.zfoo.protocol.util.StringUtils;
@@ -41,8 +37,8 @@ import com.zfoo.util.math.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -58,27 +54,29 @@ public class Consumer implements IConsumer {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
private final Map<ProtocolModule, IConsumerLoadBalancer> consumerLoadBalancerMap = new HashMap<>();
@Override
public IConsumerLoadBalancer loadBalancer(ProtocolModule protocolModule) {
public void init() {
var consumerConfig = NetContext.getConfigManager().getLocalConfig().getConsumer();
if (consumerConfig == null || CollectionUtils.isEmpty(consumerConfig.getConsumers())) {
throw new RunException("没有配置服务消费者,无法消费");
return;
}
var consumers = consumerConfig.getConsumers();
for (var consumer : consumers) {
if (consumer.getProtocolModule().equals(protocolModule)) {
return AbstractConsumerLoadBalancer.valueOf(consumer.getLoadBalancer());
}
consumerLoadBalancerMap.put(consumer.getProtocolModule(), AbstractConsumerLoadBalancer.valueOf(consumer.getLoadBalancer()));
}
return null;
}
@Override
public IConsumerLoadBalancer loadBalancer(ProtocolModule protocolModule) {
return consumerLoadBalancerMap.get(protocolModule);
}
@Override
public void send(IPacket packet, Object argument) {
try {
var module = ProtocolManager.moduleByProtocolId(packet.protocolId());
var loadBalancer = loadBalancer(module);
var loadBalancer = loadBalancer(ProtocolManager.moduleByProtocolId(packet.protocolId()));
var session = loadBalancer.loadBalancer(packet, argument);
var executorConsistentHash = (argument == null) ? RandomUtils.randomInt() : HashUtils.fnvHash(argument);
NetContext.getRouter().send(session, packet, NoAnswerAttachment.valueOf(executorConsistentHash));
@@ -89,8 +87,7 @@ public class Consumer implements IConsumer {
@Override
public <T extends IPacket> SyncAnswer<T> syncAsk(IPacket packet, Class<T> answerClass, Object argument) throws Exception {
var module = ProtocolManager.moduleByProtocolId(packet.protocolId());
var loadBalancer = loadBalancer(module);
var loadBalancer = loadBalancer(ProtocolManager.moduleByProtocolId(packet.protocolId()));
var session = loadBalancer.loadBalancer(packet, argument);
@@ -131,8 +128,7 @@ public class Consumer implements IConsumer {
@Override
public <T extends IPacket> AsyncAnswer<T> asyncAsk(IPacket packet, Class<T> answerClass, Object argument) {
var module = ProtocolManager.moduleByProtocolId(packet.protocolId());
var loadBalancer = loadBalancer(module);
var loadBalancer = loadBalancer(ProtocolManager.moduleByProtocolId(packet.protocolId()));
var session = loadBalancer.loadBalancer(packet, argument);
var asyncAnswer = NetContext.getRouter().asyncAsk(session, packet, answerClass, argument);
@@ -26,6 +26,8 @@ import org.springframework.lang.Nullable;
*/
public interface IConsumer {
void init();
IConsumerLoadBalancer loadBalancer(ProtocolModule protocolModule);
/**
@@ -63,7 +63,7 @@ public abstract class AbstractConsumerLoadBalancer implements IConsumerLoadBalan
var attribute = it.getAttribute(AttributeType.CONSUMER);
if (Objects.nonNull(attribute)) {
var registerVO = (RegisterVO) attribute;
return Objects.nonNull(registerVO.getProviderConfig()) && registerVO.getProviderConfig().getProviders().stream().anyMatch(provider -> provider.getProtocolModule().getId() == module.getId());
return Objects.nonNull(registerVO.getProviderConfig()) && registerVO.getProviderConfig().getProviders().stream().anyMatch(provider -> provider.getProtocolModule().equals(module));
} else {
return false;
}
@@ -17,9 +17,9 @@ import com.zfoo.net.config.model.ConsumerConfig;
import com.zfoo.net.config.model.ConsumerModule;
import com.zfoo.net.config.model.ProviderConfig;
import com.zfoo.net.config.model.ProviderModule;
import com.zfoo.protocol.ProtocolManager;
import com.zfoo.protocol.collection.CollectionUtils;
import com.zfoo.protocol.exception.ExceptionUtils;
import com.zfoo.protocol.registration.ProtocolModule;
import com.zfoo.protocol.util.StringUtils;
import com.zfoo.util.security.IdUtils;
import org.slf4j.Logger;
@@ -46,13 +46,13 @@ public class RegisterVO {
private ConsumerConfig consumerConfig;
public static boolean providerHasConsumerModule(RegisterVO providerVO, RegisterVO consumerVO) {
public static boolean providerHasConsumer(RegisterVO providerVO, RegisterVO consumerVO) {
if (Objects.isNull(providerVO) || Objects.isNull(providerVO.providerConfig) || CollectionUtils.isEmpty(providerVO.providerConfig.getProviders())
|| Objects.isNull(consumerVO) || Objects.isNull(consumerVO.consumerConfig) || CollectionUtils.isEmpty(consumerVO.consumerConfig.getConsumers())) {
return false;
}
for (var provider : providerVO.getProviderConfig().getProviders()) {
if (consumerVO.getConsumerConfig().getConsumers().stream().anyMatch(it -> it.getConsumer().equals(provider.getProvider()))) {
if (consumerVO.getConsumerConfig().getConsumers().stream().anyMatch(it -> provider.getProvider().equals(it.getConsumer()))) {
return true;
}
}
@@ -106,7 +106,7 @@ public class RegisterVO {
var modules = Arrays.stream(moduleSplits)
.map(it -> it.trim())
.map(it -> it.split(StringUtils.HYPHEN))
.map(it -> new ProviderModule(ProtocolManager.moduleByModuleName(StringUtils.trim(it[0])), StringUtils.trim(it[1])))
.map(it -> new ProviderModule(new ProtocolModule(Byte.parseByte(it[0]), it[1]), it[2]))
.collect(Collectors.toList());
return modules;
}
@@ -119,7 +119,7 @@ public class RegisterVO {
var modules = Arrays.stream(moduleSplits)
.map(it -> it.trim())
.map(it -> it.split(StringUtils.HYPHEN))
.map(it -> new ConsumerModule(ProtocolManager.moduleByModuleName(StringUtils.trim(it[0])), StringUtils.trim(it[1]), StringUtils.trim(it[2])))
.map(it -> new ConsumerModule(new ProtocolModule(Byte.parseByte(it[0]), it[1]), it[2], it[3]))
.collect(Collectors.toList());
return modules;
}
@@ -149,7 +149,7 @@ public class RegisterVO {
builder.append(StringUtils.SPACE).append(StringUtils.VERTICAL_BAR).append(StringUtils.SPACE);
var providerModules = providerConfig.getProviders().stream()
.map(it -> StringUtils.joinWith(StringUtils.HYPHEN, it.getProtocolModule().getName(), it.getProvider()))
.map(it -> StringUtils.joinWith(StringUtils.HYPHEN, it.getProtocolModule().getId(), it.getProtocolModule().getName(), it.getProvider()))
.collect(Collectors.toList());
builder.append(StringUtils.format("provider:[{}]"
, StringUtils.joinWith(StringUtils.COMMA + StringUtils.SPACE, providerModules.toArray())));
@@ -159,7 +159,7 @@ public class RegisterVO {
builder.append(StringUtils.SPACE).append(StringUtils.VERTICAL_BAR).append(StringUtils.SPACE);
var consumerModules = consumerConfig.getConsumers().stream()
.map(it -> StringUtils.joinWith(StringUtils.HYPHEN, it.getProtocolModule().getName(), it.getLoadBalancer(), it.getConsumer()))
.map(it -> StringUtils.joinWith(StringUtils.HYPHEN, it.getProtocolModule().getId(), it.getProtocolModule().getName(), it.getLoadBalancer(), it.getConsumer()))
.collect(Collectors.toList());
builder.append(StringUtils.format("consumer:[{}]"
, StringUtils.joinWith(StringUtils.COMMA + StringUtils.SPACE, consumerModules.toArray())));
@@ -112,13 +112,7 @@ public class ZookeeperRegistry implements IRegistry {
/**
* consumer需要消费的provider集合
*/
private final Set<RegisterVO> providerCacheSet = new ConcurrentHashSet<>();
/**
* 本地注册信息
*/
private final RegisterVO localRegisterVO = NetContext.getConfigManager().getLocalConfig().toLocalRegisterVO();
private final Set<RegisterVO> providerHashConsumerSet = new ConcurrentHashSet<>();
/**
* addListener中的cache全部会被添加到这个集合中,这个集合不包括providerCuratorCache
*/
@@ -282,8 +276,9 @@ public class ZookeeperRegistry implements IRegistry {
case NODE_CREATED:
var providerStr = StringUtils.substringAfterFirst(newData.getPath(), PROVIDER_ROOT_PATH + StringUtils.SLASH);
var provider = RegisterVO.parseString(providerStr);
if (RegisterVO.providerHasConsumerModule(provider, localRegisterVO)) {
providerCacheSet.add(provider);
var localRegisterVO = NetContext.getConfigManager().getLocalConfig().toLocalRegisterVO();
if (RegisterVO.providerHasConsumer(provider, localRegisterVO)) {
providerHashConsumerSet.add(provider);
checkConsumer();
logger.info("发现新的订阅服务[{}]", providerStr);
}
@@ -291,8 +286,8 @@ public class ZookeeperRegistry implements IRegistry {
case NODE_DELETED:
var oldProviderStr = StringUtils.substringAfterFirst(oldData.getPath(), PROVIDER_ROOT_PATH + StringUtils.SLASH);
var oldProvider = RegisterVO.parseString(oldProviderStr);
if (providerCacheSet.contains(oldProvider)) {
providerCacheSet.remove(oldProvider);
if (providerHashConsumerSet.contains(oldProvider)) {
providerHashConsumerSet.remove(oldProvider);
checkConsumer();
logger.info("取消订阅服务[{}]", oldProviderStr);
}
@@ -329,6 +324,7 @@ public class ZookeeperRegistry implements IRegistry {
}
private void initLocalProvider() throws Exception {
var localRegisterVO = NetContext.getConfigManager().getLocalConfig().toLocalRegisterVO();
if (Objects.nonNull(localRegisterVO.getProviderConfig())) {
var localProviderVoStr = localRegisterVO.toProviderString();
var localProviderPath = PROVIDER_ROOT_PATH + StringUtils.SLASH + localProviderVoStr;
@@ -357,16 +353,17 @@ public class ZookeeperRegistry implements IRegistry {
}
private void initConsumerCache() throws Exception {
var localRegisterVO = NetContext.getConfigManager().getLocalConfig().toLocalRegisterVO();
// 初始化providerCacheSet
var remoteProviderSet = curator.getChildren().forPath(PROVIDER_ROOT_PATH).stream()
.filter(it -> StringUtils.isNotBlank(it) && !"null".equals(it))
.map(it -> RegisterVO.parseString(it))
.filter(it -> Objects.nonNull(it))
.filter(it -> RegisterVO.providerHasConsumerModule(it, localRegisterVO))
.filter(it -> RegisterVO.providerHasConsumer(it, localRegisterVO))
.collect(Collectors.toSet());
providerCacheSet.clear();
providerCacheSet.addAll(remoteProviderSet);
providerHashConsumerSet.clear();
providerHashConsumerSet.addAll(remoteProviderSet);
// 初始化consumerproviderCacheSet改变会导致消费者改变
checkConsumer();
@@ -391,11 +388,12 @@ public class ZookeeperRegistry implements IRegistry {
return;
}
logger.info("开始通过providerCacheSet:{}检查[consumer:{}]", providerCacheSet, NetContext.getSessionManager().getClientSessionMap().size());
logger.info("开始通过providerHashConsumerSet:{}检查[consumer:{}]", providerHashConsumerSet, NetContext.getSessionManager().getClientSessionMap().size());
var recheckFlag = false;
for (var providerCache : providerCacheSet) {
for (var providerCache : providerHashConsumerSet) {
// 先排除已经启动的consumer
var consumerClientList = NetContext.getSessionManager().getClientSessionMap().values().stream()
.filter(it -> {
var attribute = it.getAttribute(AttributeType.CONSUMER);
@@ -428,6 +426,7 @@ public class ZookeeperRegistry implements IRegistry {
EventBus.asyncSubmit(ConsumerStartEvent.valueOf(providerCache, session));
try {
var localRegisterVO = NetContext.getConfigManager().getLocalConfig().toLocalRegisterVO();
var path = CONSUMER_ROOT_PATH + StringUtils.SLASH + localRegisterVO.toConsumerString();
var stat = curator.checkExists().forPath(path);
if (Objects.isNull(stat)) {
@@ -446,12 +445,7 @@ public class ZookeeperRegistry implements IRegistry {
}
if (recheckFlag) {
SchedulerBus.schedule(new Runnable() {
@Override
public void run() {
checkConsumer();
}
}, RETRY_SECONDS, TimeUnit.SECONDS);
SchedulerBus.schedule(() -> checkConsumer(), RETRY_SECONDS, TimeUnit.SECONDS);
}
}
@@ -582,6 +576,7 @@ public class ZookeeperRegistry implements IRegistry {
return;
}
try {
var localRegisterVO = NetContext.getConfigManager().getLocalConfig().toLocalRegisterVO();
if (curator.getState() == CuratorFrameworkState.STARTED) {
// 删除服务提供者的临时节点
if (Objects.nonNull(localRegisterVO.getProviderConfig())) {
@@ -0,0 +1,58 @@
/*
* Copyright (C) 2020 The zfoo Authors
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
*/
package com.zfoo.net.config;
import com.zfoo.net.config.model.ConsumerConfig;
import com.zfoo.net.config.model.ConsumerModule;
import com.zfoo.net.config.model.ProviderConfig;
import com.zfoo.net.config.model.ProviderModule;
import com.zfoo.net.consumer.registry.RegisterVO;
import com.zfoo.protocol.registration.ProtocolModule;
import com.zfoo.util.net.HostAndPort;
import io.netty.util.NetUtil;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
/**
* @author jaysunxiao
* @version 3.0
*/
public class RegistryTest {
@Test
public void registerVoTest() {
var protocolModule1 = new ProtocolModule((byte) 100, "aaa");
var protocolModule2 = new ProtocolModule((byte) 120, "bbb");
var providerModules = List.of(new ProviderModule(protocolModule1, "a"), new ProviderModule(protocolModule2, "b"));
var providerConfig = ProviderConfig.valueOf(HostAndPort.valueOf("127.0.0.1", 80).toHostAndPortStr(), providerModules);
var consumerModules = List.of(new ConsumerModule(protocolModule1, "random", "a"), new ConsumerModule(protocolModule2, "random", "b"));
var consumerConfig = ConsumerConfig.valueOf(consumerModules);
var vo = RegisterVO.valueOf("test", providerConfig, consumerConfig);
var voStr = vo.toString();
System.out.println(voStr);
var newVo = RegisterVO.parseString(voStr);
Assert.assertEquals(vo, newVo);
System.out.println(NetUtil.LOCALHOST);
System.out.println(NetUtil.LOCALHOST4);
System.out.println(NetUtil.LOCALHOST6);
System.out.println(NetUtil.SOMAXCONN);
System.out.println(NetUtil.LOOPBACK_IF);
}
}
+5 -5
View File
@@ -3,7 +3,7 @@
<!-- native为内部消息,common是公共消息每个模块都能使用,js是web通信用的协议会生成js协议文件 -->
<protocols author="jaysunxiao">
<module id="1" name="native" minId="0" maxId="100" version="99.99.999">
<module id="1" name="native" minId="0" maxId="100">
<protocol location="com.zfoo.net.router.attachment.SignalAttachment"/>
<protocol location="com.zfoo.net.router.attachment.GatewayAttachment"/>
<protocol location="com.zfoo.net.router.attachment.UdpAttachment"/>
@@ -19,7 +19,7 @@
</module>
<!-- 在xml文件中写协议号是为了统一规划协议号,更加直观;不写协议号也没有影响 -->
<module id="2" name="common" minId="100" maxId="1000" version="99.99.999">
<module id="2" name="common" minId="100" maxId="1000">
<protocol id="100" location="com.zfoo.net.packet.common.Message"/>
<protocol id="101" location="com.zfoo.net.packet.common.Error"/>
<protocol id="102" location="com.zfoo.net.packet.common.Heartbeat"/>
@@ -35,7 +35,7 @@
</module>
<module id="3" name="test" minId="1000" maxId="2000" version="1.0.0">
<module id="3" name="test" minId="1000" maxId="2000">
<protocol id="1110" location="com.zfoo.net.packet.CM_Int" enhance="false"/>
<protocol id="1111" location="com.zfoo.net.packet.SM_Int" enhance="false"/>
<protocol id="1112" location="com.zfoo.net.packet.CM_Float" enhance="false"/>
@@ -72,13 +72,13 @@
<protocol id="1501" location="com.zfoo.net.packet.jprotobuf.JProtobufHelloResponse"/>
</module>
<module id="4" name="js" minId="2000" maxId="3000" version="1.0.0">
<module id="4" name="js" minId="2000" maxId="3000">
<protocol id="2070" location="com.zfoo.net.packet.websocket.WebSocketPacketRequest" enhance="false"/>
<protocol id="2071" location="com.zfoo.net.packet.websocket.WebSocketObjectA" enhance="false"/>
<protocol id="2072" location="com.zfoo.net.packet.websocket.WebSocketObjectB" enhance="false"/>
</module>
<module id="5" name="providerTest" minId="3000" maxId="8000" version="1.0.0">
<module id="5" name="providerTest" minId="3000" maxId="8000">
<protocol id="4000" location="com.zfoo.net.packet.provider.ProviderMessAsk" enhance="false"/>
<protocol id="4001" location="com.zfoo.net.packet.provider.ProviderMessAnswer" enhance="false"/>