perf[module]: 以TcpServerTest为例子,从头梳理asyncAask的执行流程并添加注释

This commit is contained in:
jianan
2022-07-07 01:08:30 +08:00
parent c2c4ef6320
commit 9753d576a7
12 changed files with 134 additions and 5 deletions
@@ -24,6 +24,7 @@ public class ConsumerModule {
private ProtocolModule protocolModule;
// 负载均衡方式
private String loadBalancer;
// 消费哪个provider
@@ -17,13 +17,20 @@ import com.zfoo.protocol.registration.ProtocolModule;
import java.util.Objects;
/**
*
* @author jaysunxiao
* @version 3.0
*/
public class ProviderModule {
/**
* 模块id和模块名
*/
private ProtocolModule protocolModule;
/**
* 提供者名字
*/
private String provider;
public ProviderModule(ProtocolModule protocolModule, String provider) {
@@ -42,9 +42,11 @@ public class RegisterVO {
private static final String uuid = IdUtils.getUUID();
private String id;
private ProviderConfig providerConfig;
private ConsumerConfig consumerConfig;
// 服务提供者配置
private ProviderConfig providerConfig;
// 服务消费者配置
private ConsumerConfig consumerConfig;
public static boolean providerHasConsumer(RegisterVO providerVO, RegisterVO consumerVO) {
if (Objects.isNull(providerVO) || Objects.isNull(providerVO.providerConfig) || CollectionUtils.isEmpty(providerVO.providerConfig.getProviders())
@@ -137,30 +139,39 @@ public class RegisterVO {
@Override
public String toString() {
var builder = new StringBuilder();
// 模块模块名
builder.append(id);
// 服务提供者相关配置信息
if (Objects.nonNull(providerConfig)) {
var providerAddress = providerConfig.getAddress();
if (StringUtils.isBlank(providerAddress)) {
throw new RuntimeException(StringUtils.format("providerConfig的address不能为空"));
}
builder.append(StringUtils.SPACE).append(StringUtils.VERTICAL_BAR).append(StringUtils.SPACE);
// 服务提供者地址
builder.append(providerAddress);
builder.append(StringUtils.SPACE).append(StringUtils.VERTICAL_BAR).append(StringUtils.SPACE);
var providerModules = providerConfig.getProviders().stream()
.map(it -> StringUtils.joinWith(StringUtils.HYPHEN, it.getProtocolModule().getId(), it.getProtocolModule().getName(), it.getProvider()))
.collect(Collectors.toList());
// 服务提供者模块信息列表
builder.append(StringUtils.format("provider:[{}]"
, StringUtils.joinWith(StringUtils.COMMA + StringUtils.SPACE, providerModules.toArray())));
}
// 服务消费者相关信息
if (Objects.nonNull(consumerConfig)) {
builder.append(StringUtils.SPACE).append(StringUtils.VERTICAL_BAR).append(StringUtils.SPACE);
var consumerModules = consumerConfig.getConsumers().stream()
.map(it -> StringUtils.joinWith(StringUtils.HYPHEN, it.getProtocolModule().getId(), it.getProtocolModule().getName(), it.getLoadBalancer(), it.getConsumer()))
.collect(Collectors.toList());
// 服务消费者模块信息列表
builder.append(StringUtils.format("consumer:[{}]"
, StringUtils.joinWith(StringUtils.COMMA + StringUtils.SPACE, consumerModules.toArray())));
}
@@ -69,6 +69,13 @@ public class Router implements IRouter {
private final FastThreadLocal<SignalAttachment> serverReceiveSignalAttachmentThreadLocal = new FastThreadLocal<>();
/**
* 在服务端收到数据后,会调用这个方法. 这个方法在BaseRouteHandler.java的channelRead中被调用
*
* @param session
* @param packet
* @param attachment
*/
@Override
public void receive(Session session, IPacket packet, @Nullable IAttachment attachment) {
if (packet.protocolId() == Heartbeat.PROTOCOL_ID) {
@@ -90,11 +97,14 @@ public class Router implements IRouter {
// 客户端收到服务器应答,客户端发送的时候isClient为true,服务器收到的时候将其设置为false
var removedAttachment = (SignalAttachment) SignalBridge.removeSignalAttachment(signalAttachment);
if (removedAttachment != null) {
// 这里会让之前的CompletableFuture得到结果,从而像asyncAsk之类的回调到结果
removedAttachment.getResponseFuture().complete(packet);
} else {
logger.error("client receives packet:[{}] and attachment:[{}] from server, but clientAttachmentMap has no attachment, perhaps timeout exception."
, JsonUtils.object2String(packet), JsonUtils.object2String(attachment));
}
// 注意:这个return,这样子,asyncAsk的结果就返回了。
return;
}
@@ -137,7 +147,8 @@ public class Router implements IRouter {
}
}
// 正常发送消息的接收
// 正常发送消息的接收,把客户端的业务请求包装下到路由策略指定的线程进行业务处理
// 注意:像客户端以asyncAsk发送请求,在服务器处理完后返回结果,也是进入这个receive方法,但是attachment不为空,会提前return掉不会走到这
TaskBus.submit(new PacketReceiverTask(session, packet, attachment));
}
@@ -207,10 +218,23 @@ public class Router implements IRouter {
}
/**
* 注意:这个里面其实还是调用send发送的消息
*
* @param session
* @param packet
* @param answerClass
* @param argument
* @param <T>
* @return
*/
@Override
public <T extends IPacket> AsyncAnswer<T> asyncAsk(Session session, IPacket packet, @Nullable Class<T> answerClass, @Nullable Object argument) {
var clientSignalAttachment = new SignalAttachment();
// 因此第3个参数传null,会得到一个随机的值,在得到结果回调时,是随机的线程
// 这个值用于返回时,在CompletableFuture中选择哪个线程执行,也就是回到哪个线程
var executorConsistentHash = (argument == null) ? RandomUtils.randomInt() : HashUtils.fnvHash(argument);
clientSignalAttachment.setExecutorConsistentHash(executorConsistentHash);
// 服务器在同步或异步的消息处理中,又调用了同步或异步的方法,这时候threadReceiverAttachment不为空
@@ -221,6 +245,7 @@ public class Router implements IRouter {
asyncAnswer.setSignalAttachment(clientSignalAttachment);
clientSignalAttachment.getResponseFuture()
// 因此超时的情况,返回的是null
.completeOnTimeout(null, DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS)
.thenApply(answer -> {
if (answer == null) {
@@ -237,6 +262,7 @@ public class Router implements IRouter {
return answer;
})
.whenCompleteAsync((answer, throwable) -> {
// 注意:进入这个方法的时机是:在上面的receive方法中,由于是asyncAsk的消息,attachment不为空,会调用CompletableFuture的complete方法
try {
SignalBridge.removeSignalAttachment(clientSignalAttachment);
@@ -302,7 +328,7 @@ public class Router implements IRouter {
}
}
// 调用PacketReceiver
// 调用PacketReceiver,进行真正的业务处理
PacketBus.submit(session, packet, attachment);
} catch (Exception e) {
logger.error(StringUtils.format("e[uid:{}][sid:{}]未知exception异常", session.getAttribute(AttributeType.UID), session.getSid(), e.getMessage()), e);
@@ -46,6 +46,8 @@ public class AsyncAnswer<T extends IPacket> implements IAsyncAnswer<T> {
@Override
public void whenComplete(Consumer<T> consumer) {
thenAccept(consumer);
// 这里其实会触发发送消息
askCallback.run();
}
@@ -55,6 +55,7 @@ public class NetDefinitionParser implements BeanDefinitionParser {
// 注册ConfigManager
clazz = ConfigManager.class;
builder = BeanDefinitionBuilder.rootBeanDefinition(clazz);
// 把Spring容器中的NetConfig引用赋值给ConfigManager中的localConfig属性
builder.addPropertyReference("localConfig", NetConfig.class.getCanonicalName());
parserContext.getRegistry().registerBeanDefinition(clazz.getCanonicalName(), builder.getBeanDefinition());
@@ -74,6 +75,7 @@ public class NetDefinitionParser implements BeanDefinitionParser {
parserContext.getRegistry().registerBeanDefinition(clazz.getCanonicalName(), builder.getBeanDefinition());
// 注册SessionManager
// 里面的serverSessionMap 和 clientSessionMap 是根据自己是客户端还是服务器保存连接自己 和 自己连接 的网络节点信息
clazz = SessionManager.class;
builder = BeanDefinitionBuilder.rootBeanDefinition(clazz);
parserContext.getRegistry().registerBeanDefinition(clazz.getCanonicalName(), builder.getBeanDefinition());
@@ -80,6 +80,7 @@ public final class TaskBus {
* SignalAttachmentexecutorConsistentHash通过IRouter和IConsumer的argument参数指定
*/
public static void submit(PacketReceiverTask task) {
// 里面会看到是:其中一致性hash是根据附加包记录的hashId进行选择哪个线程进行业务处理
taskDispatch.getExecutor(task).execute(task);
}
@@ -38,6 +38,7 @@ public class ConsistentHashTaskDispatch extends AbstractTaskDispatch {
return SessionIdTaskDispatch.getInstance().getExecutor(packetReceiverTask);
}
// 可见最终是根据附加包的信息选择服务端由哪个线程执行这个业务
return TaskBus.executor(attachment.executorConsistentHash());
}
@@ -34,25 +34,53 @@ public class RegistryTest {
@Test
public void registerVoTest() {
// 定义2个模块:可以为服务提供者用,也可以为服务消费者用,这个仅仅是模块信息
var protocolModule1 = new ProtocolModule((byte) 100, "aaa");
var protocolModule2 = new ProtocolModule((byte) 120, "bbb");
// 服务提供者模块列表和服务提供者配置
// 定义2个服务提供者模块
var providerModules = List.of(new ProviderModule(protocolModule1, "a"), new ProviderModule(protocolModule2, "b"));
// 服务器提供者配置:服务提供者的ip + 服务提供者模块
var providerConfig = ProviderConfig.valueOf(HostAndPort.valueOf("127.0.0.1", 80).toHostAndPortStr(), providerModules);
// 服务消费者模块和服务消费者配置(服务消费者模块多一个负载均衡属性)
var consumerModules = List.of(new ConsumerModule(protocolModule1, "random", "a"), new ConsumerModule(protocolModule2, "random", "b"));
// 服务消费者配置:这个是没Ip的
var consumerConfig = ConsumerConfig.valueOf(consumerModules);
var vo = RegisterVO.valueOf("test", providerConfig, consumerConfig);
var voStr = vo.toString();
// test | 127.0.0.1:80 | provider:[100-aaa-a, 120-bbb-b] | consumer:[100-aaa-random-a, 120-bbb-random-b]
System.out.println(voStr);
var newVo = RegisterVO.parseString(voStr);
Assert.assertEquals(vo, newVo);
// /127.0.0.1
System.out.println(NetUtil.LOCALHOST);
// localhost/127.0.0.1
System.out.println(NetUtil.LOCALHOST4);
//localhost/0:0:0:0:0:0:0:1
System.out.println(NetUtil.LOCALHOST6);
// 200
System.out.println(NetUtil.SOMAXCONN);
// name:lo (Software Loopback Interface 1)
System.out.println(NetUtil.LOOPBACK_IF);
}
}
/*
test | 127.0.0.1:80 | provider:[100-aaa-a, 120-bbb-b] | consumer:[100-aaa-random-a, 120-bbb-random-b]
/127.0.0.1
localhost/127.0.0.1
localhost/0:0:0:0:0:0:0:1
200
name:lo (Software Loopback Interface 1)
*/
@@ -40,12 +40,19 @@ public class TcpClientTest {
var context = new ClassPathXmlApplicationContext("config.xml");
var client = new TcpClient(HostAndPort.valueOf("127.0.0.1:9000"));
// 得到连接到服务器的session
var session = client.start();
for (int i = 0; i < 1000; i++) {
var ask = new AsyncMessAsk();
ask.setMessage("Hello, this is async client!");
// Router只是一个接口,它的赋值也是在容器初始化好之后从Spring容器获取到的Router对象
// 要发送网络数据,要通过方法asyncAsk,第一个参数就是要发送消息的session
// 注意:第3个argument参数是当收到消息后,是要要到哪个线程处理,这是CompletableFuture保证的
// 其实这个依然是通过session,然后调用send发送过去的消息
// 最终依然是通过BaseRouteHandler得到响应,只不过attachment不为空,会触发CompletableFuture的completable方法,从而得到回调结果
NetContext.getRouter().asyncAsk(session, ask, AsyncMessAnswer.class, null)
.whenComplete(answer -> {
logger.info("异步请求收到结果[{}]", JsonUtils.object2String(answer));
@@ -29,12 +29,53 @@ public class TcpServerTest {
@Test
public void startServer() {
// 启动流程笔记:
// 首先,这里是xml配置,这样子就会解析xml配置中的标签
// 当遇到自定义标签后,就会触发NamespaceHandler 和 NetDefinitionParser 的自定义标签解析流程
// 接着就会往Spring容器注册相关的各种bean
var context = new ClassPathXmlApplicationContext("config.xml");
// 打印下当前spring管理的bean信息
// 可以得出结论:通过解析自定义标签,往spring容器中自己主动注册的几个
// 和网络有关的全类名的bean(NetConfig、NetContext、ConfigManager、PacketService、Router、Consumer、SessionManager)
String[] beanNames = context.getBeanDefinitionNames();
for (String beanName : beanNames) {
System.out.println(beanName);
}
// 启动起来一个Tcp服务器
// 注意:由于之前往Spring容器中注册的SessionManager,从而可以使用这个bean变量管理所有的连接信息
// 这也是spring的好处:很多东西是解耦的,让SessionManager的初始化交给的是spring,而不是自己从main函数开始一个个组件去初始化
var server = new TcpServer(HostAndPort.valueOf("127.0.0.1:9000"));
server.start();
ThreadUtils.sleep(Long.MAX_VALUE);
}
}
/*
serverPacketController
gatewayProviderController
JProtobufTcpClientController
JProtobufTcpController
providerController
tcpClientController
tcpServerController
tcpAsyncController
tcpSyncController
udpClientPacketController
udpServerPacketController
websocketClientPacketController
websocketServerPacketController
org.springframework.context.annotation.internalConfigurationAnnotationProcessor
org.springframework.context.annotation.internalAutowiredAnnotationProcessor
org.springframework.context.event.internalEventListenerProcessor
org.springframework.context.event.internalEventListenerFactory
com.zfoo.net.config.model.NetConfig
com.zfoo.net.NetContext
com.zfoo.net.config.manager.ConfigManager
com.zfoo.net.packet.service.PacketService
com.zfoo.net.router.Router
com.zfoo.net.consumer.Consumer
com.zfoo.net.session.manager.SessionManager
*/
@@ -16,6 +16,8 @@ package com.zfoo.protocol.registration;
import com.zfoo.protocol.util.StringUtils;
/**
* 模块id和模块名的封装
*
* @author jaysunxiao
* @version 3.0
*/