From 9753d576a7b80cd50fcbe8ac80d4e1e1fda1bb1f Mon Sep 17 00:00:00 2001 From: jianan <1072772483@qq.com> Date: Thu, 7 Jul 2022 01:08:30 +0800 Subject: [PATCH 1/2] =?UTF-8?q?perf[module]:=20=E4=BB=A5TcpServerTest?= =?UTF-8?q?=E4=B8=BA=E4=BE=8B=E5=AD=90=EF=BC=8C=E4=BB=8E=E5=A4=B4=E6=A2=B3?= =?UTF-8?q?=E7=90=86asyncAask=E7=9A=84=E6=89=A7=E8=A1=8C=E6=B5=81=E7=A8=8B?= =?UTF-8?q?=E5=B9=B6=E6=B7=BB=E5=8A=A0=E6=B3=A8=E9=87=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../zfoo/net/config/model/ConsumerModule.java | 1 + .../zfoo/net/config/model/ProviderModule.java | 7 +++ .../net/consumer/registry/RegisterVO.java | 15 ++++++- .../main/java/com/zfoo/net/router/Router.java | 30 ++++++++++++- .../zfoo/net/router/answer/AsyncAnswer.java | 2 + .../zfoo/net/schema/NetDefinitionParser.java | 2 + .../main/java/com/zfoo/net/task/TaskBus.java | 1 + .../ConsistentHashTaskDispatch.java | 1 + .../com/zfoo/net/config/RegistryTest.java | 28 ++++++++++++ .../core/tcpAsync/client/TcpClientTest.java | 7 +++ .../core/tcpAsync/server/TcpServerTest.java | 43 ++++++++++++++++++- .../protocol/registration/ProtocolModule.java | 2 + 12 files changed, 134 insertions(+), 5 deletions(-) diff --git a/net/src/main/java/com/zfoo/net/config/model/ConsumerModule.java b/net/src/main/java/com/zfoo/net/config/model/ConsumerModule.java index 8b75789e..f711144e 100644 --- a/net/src/main/java/com/zfoo/net/config/model/ConsumerModule.java +++ b/net/src/main/java/com/zfoo/net/config/model/ConsumerModule.java @@ -24,6 +24,7 @@ public class ConsumerModule { private ProtocolModule protocolModule; + // 负载均衡方式 private String loadBalancer; // 消费哪个provider diff --git a/net/src/main/java/com/zfoo/net/config/model/ProviderModule.java b/net/src/main/java/com/zfoo/net/config/model/ProviderModule.java index fab8dd05..fe1b4409 100644 --- a/net/src/main/java/com/zfoo/net/config/model/ProviderModule.java +++ b/net/src/main/java/com/zfoo/net/config/model/ProviderModule.java @@ -17,13 +17,20 @@ import com.zfoo.protocol.registration.ProtocolModule; import java.util.Objects; /** + * * @author jaysunxiao * @version 3.0 */ public class ProviderModule { + /** + * 模块id和模块名 + */ private ProtocolModule protocolModule; + /** + * 提供者名字 + */ private String provider; public ProviderModule(ProtocolModule protocolModule, String provider) { diff --git a/net/src/main/java/com/zfoo/net/consumer/registry/RegisterVO.java b/net/src/main/java/com/zfoo/net/consumer/registry/RegisterVO.java index 4b64bfbc..3c22a3ee 100644 --- a/net/src/main/java/com/zfoo/net/consumer/registry/RegisterVO.java +++ b/net/src/main/java/com/zfoo/net/consumer/registry/RegisterVO.java @@ -42,9 +42,11 @@ public class RegisterVO { private static final String uuid = IdUtils.getUUID(); private String id; - private ProviderConfig providerConfig; - private ConsumerConfig consumerConfig; + // 服务提供者配置 + private ProviderConfig providerConfig; + // 服务消费者配置 + private ConsumerConfig consumerConfig; public static boolean providerHasConsumer(RegisterVO providerVO, RegisterVO consumerVO) { if (Objects.isNull(providerVO) || Objects.isNull(providerVO.providerConfig) || CollectionUtils.isEmpty(providerVO.providerConfig.getProviders()) @@ -137,30 +139,39 @@ public class RegisterVO { @Override public String toString() { var builder = new StringBuilder(); + + // 模块模块名 builder.append(id); + // 服务提供者相关配置信息 if (Objects.nonNull(providerConfig)) { var providerAddress = providerConfig.getAddress(); if (StringUtils.isBlank(providerAddress)) { throw new RuntimeException(StringUtils.format("providerConfig的address不能为空")); } builder.append(StringUtils.SPACE).append(StringUtils.VERTICAL_BAR).append(StringUtils.SPACE); + // 服务提供者地址 builder.append(providerAddress); builder.append(StringUtils.SPACE).append(StringUtils.VERTICAL_BAR).append(StringUtils.SPACE); var providerModules = providerConfig.getProviders().stream() .map(it -> StringUtils.joinWith(StringUtils.HYPHEN, it.getProtocolModule().getId(), it.getProtocolModule().getName(), it.getProvider())) .collect(Collectors.toList()); + + // 服务提供者模块信息列表 builder.append(StringUtils.format("provider:[{}]" , StringUtils.joinWith(StringUtils.COMMA + StringUtils.SPACE, providerModules.toArray()))); } + // 服务消费者相关信息 if (Objects.nonNull(consumerConfig)) { builder.append(StringUtils.SPACE).append(StringUtils.VERTICAL_BAR).append(StringUtils.SPACE); var consumerModules = consumerConfig.getConsumers().stream() .map(it -> StringUtils.joinWith(StringUtils.HYPHEN, it.getProtocolModule().getId(), it.getProtocolModule().getName(), it.getLoadBalancer(), it.getConsumer())) .collect(Collectors.toList()); + + // 服务消费者模块信息列表 builder.append(StringUtils.format("consumer:[{}]" , StringUtils.joinWith(StringUtils.COMMA + StringUtils.SPACE, consumerModules.toArray()))); } diff --git a/net/src/main/java/com/zfoo/net/router/Router.java b/net/src/main/java/com/zfoo/net/router/Router.java index c441ba87..0c3a7843 100644 --- a/net/src/main/java/com/zfoo/net/router/Router.java +++ b/net/src/main/java/com/zfoo/net/router/Router.java @@ -69,6 +69,13 @@ public class Router implements IRouter { private final FastThreadLocal serverReceiveSignalAttachmentThreadLocal = new FastThreadLocal<>(); + /** + * 在服务端收到数据后,会调用这个方法. 这个方法在BaseRouteHandler.java的channelRead中被调用 + * + * @param session + * @param packet + * @param attachment + */ @Override public void receive(Session session, IPacket packet, @Nullable IAttachment attachment) { if (packet.protocolId() == Heartbeat.PROTOCOL_ID) { @@ -90,11 +97,14 @@ public class Router implements IRouter { // 客户端收到服务器应答,客户端发送的时候isClient为true,服务器收到的时候将其设置为false var removedAttachment = (SignalAttachment) SignalBridge.removeSignalAttachment(signalAttachment); if (removedAttachment != null) { + // 这里会让之前的CompletableFuture得到结果,从而像asyncAsk之类的回调到结果 removedAttachment.getResponseFuture().complete(packet); } else { logger.error("client receives packet:[{}] and attachment:[{}] from server, but clientAttachmentMap has no attachment, perhaps timeout exception." , JsonUtils.object2String(packet), JsonUtils.object2String(attachment)); } + + // 注意:这个return,这样子,asyncAsk的结果就返回了。 return; } @@ -137,7 +147,8 @@ public class Router implements IRouter { } } - // 正常发送消息的接收 + // 正常发送消息的接收,把客户端的业务请求包装下到路由策略指定的线程进行业务处理 + // 注意:像客户端以asyncAsk发送请求,在服务器处理完后返回结果,也是进入这个receive方法,但是attachment不为空,会提前return掉不会走到这 TaskBus.submit(new PacketReceiverTask(session, packet, attachment)); } @@ -207,10 +218,23 @@ public class Router implements IRouter { } + /** + * 注意:这个里面其实还是调用send发送的消息 + * + * @param session + * @param packet + * @param answerClass + * @param argument + * @param + * @return + */ @Override public AsyncAnswer asyncAsk(Session session, IPacket packet, @Nullable Class answerClass, @Nullable Object argument) { var clientSignalAttachment = new SignalAttachment(); + // 因此第3个参数传null,会得到一个随机的值,在得到结果回调时,是随机的线程 + // 这个值用于返回时,在CompletableFuture中选择哪个线程执行,也就是回到哪个线程 var executorConsistentHash = (argument == null) ? RandomUtils.randomInt() : HashUtils.fnvHash(argument); + clientSignalAttachment.setExecutorConsistentHash(executorConsistentHash); // 服务器在同步或异步的消息处理中,又调用了同步或异步的方法,这时候threadReceiverAttachment不为空 @@ -221,6 +245,7 @@ public class Router implements IRouter { asyncAnswer.setSignalAttachment(clientSignalAttachment); clientSignalAttachment.getResponseFuture() + // 因此超时的情况,返回的是null .completeOnTimeout(null, DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS) .thenApply(answer -> { if (answer == null) { @@ -237,6 +262,7 @@ public class Router implements IRouter { return answer; }) .whenCompleteAsync((answer, throwable) -> { + // 注意:进入这个方法的时机是:在上面的receive方法中,由于是asyncAsk的消息,attachment不为空,会调用CompletableFuture的complete方法 try { SignalBridge.removeSignalAttachment(clientSignalAttachment); @@ -302,7 +328,7 @@ public class Router implements IRouter { } } - // 调用PacketReceiver + // 调用PacketReceiver,进行真正的业务处理 PacketBus.submit(session, packet, attachment); } catch (Exception e) { logger.error(StringUtils.format("e[uid:{}][sid:{}]未知exception异常", session.getAttribute(AttributeType.UID), session.getSid(), e.getMessage()), e); diff --git a/net/src/main/java/com/zfoo/net/router/answer/AsyncAnswer.java b/net/src/main/java/com/zfoo/net/router/answer/AsyncAnswer.java index e39b6ae4..f638ca6c 100644 --- a/net/src/main/java/com/zfoo/net/router/answer/AsyncAnswer.java +++ b/net/src/main/java/com/zfoo/net/router/answer/AsyncAnswer.java @@ -46,6 +46,8 @@ public class AsyncAnswer implements IAsyncAnswer { @Override public void whenComplete(Consumer consumer) { thenAccept(consumer); + + // 这里其实会触发发送消息 askCallback.run(); } diff --git a/net/src/main/java/com/zfoo/net/schema/NetDefinitionParser.java b/net/src/main/java/com/zfoo/net/schema/NetDefinitionParser.java index 2647f607..6f7c362b 100644 --- a/net/src/main/java/com/zfoo/net/schema/NetDefinitionParser.java +++ b/net/src/main/java/com/zfoo/net/schema/NetDefinitionParser.java @@ -55,6 +55,7 @@ public class NetDefinitionParser implements BeanDefinitionParser { // 注册ConfigManager clazz = ConfigManager.class; builder = BeanDefinitionBuilder.rootBeanDefinition(clazz); + // 把Spring容器中的NetConfig引用赋值给ConfigManager中的localConfig属性 builder.addPropertyReference("localConfig", NetConfig.class.getCanonicalName()); parserContext.getRegistry().registerBeanDefinition(clazz.getCanonicalName(), builder.getBeanDefinition()); @@ -74,6 +75,7 @@ public class NetDefinitionParser implements BeanDefinitionParser { parserContext.getRegistry().registerBeanDefinition(clazz.getCanonicalName(), builder.getBeanDefinition()); // 注册SessionManager + // 里面的serverSessionMap 和 clientSessionMap 是根据自己是客户端还是服务器保存连接自己 和 自己连接 的网络节点信息 clazz = SessionManager.class; builder = BeanDefinitionBuilder.rootBeanDefinition(clazz); parserContext.getRegistry().registerBeanDefinition(clazz.getCanonicalName(), builder.getBeanDefinition()); diff --git a/net/src/main/java/com/zfoo/net/task/TaskBus.java b/net/src/main/java/com/zfoo/net/task/TaskBus.java index 7e4fd744..069869dd 100644 --- a/net/src/main/java/com/zfoo/net/task/TaskBus.java +++ b/net/src/main/java/com/zfoo/net/task/TaskBus.java @@ -80,6 +80,7 @@ public final class TaskBus { * SignalAttachment:executorConsistentHash通过IRouter和IConsumer的argument参数指定 */ public static void submit(PacketReceiverTask task) { + // 里面会看到是:其中一致性hash是根据附加包记录的hashId进行选择哪个线程进行业务处理 taskDispatch.getExecutor(task).execute(task); } diff --git a/net/src/main/java/com/zfoo/net/task/dispatcher/ConsistentHashTaskDispatch.java b/net/src/main/java/com/zfoo/net/task/dispatcher/ConsistentHashTaskDispatch.java index d350ced8..17d0cd8b 100644 --- a/net/src/main/java/com/zfoo/net/task/dispatcher/ConsistentHashTaskDispatch.java +++ b/net/src/main/java/com/zfoo/net/task/dispatcher/ConsistentHashTaskDispatch.java @@ -38,6 +38,7 @@ public class ConsistentHashTaskDispatch extends AbstractTaskDispatch { return SessionIdTaskDispatch.getInstance().getExecutor(packetReceiverTask); } + // 可见最终是根据附加包的信息选择服务端由哪个线程执行这个业务 return TaskBus.executor(attachment.executorConsistentHash()); } diff --git a/net/src/test/java/com/zfoo/net/config/RegistryTest.java b/net/src/test/java/com/zfoo/net/config/RegistryTest.java index 4efd5e0d..d5062ae2 100644 --- a/net/src/test/java/com/zfoo/net/config/RegistryTest.java +++ b/net/src/test/java/com/zfoo/net/config/RegistryTest.java @@ -34,25 +34,53 @@ public class RegistryTest { @Test public void registerVoTest() { + // 定义2个模块:可以为服务提供者用,也可以为服务消费者用,这个仅仅是模块信息 var protocolModule1 = new ProtocolModule((byte) 100, "aaa"); var protocolModule2 = new ProtocolModule((byte) 120, "bbb"); + + // 服务提供者模块列表和服务提供者配置 + // 定义2个服务提供者模块 var providerModules = List.of(new ProviderModule(protocolModule1, "a"), new ProviderModule(protocolModule2, "b")); + // 服务器提供者配置:服务提供者的ip + 服务提供者模块 var providerConfig = ProviderConfig.valueOf(HostAndPort.valueOf("127.0.0.1", 80).toHostAndPortStr(), providerModules); + // 服务消费者模块和服务消费者配置(服务消费者模块多一个负载均衡属性) var consumerModules = List.of(new ConsumerModule(protocolModule1, "random", "a"), new ConsumerModule(protocolModule2, "random", "b")); + // 服务消费者配置:这个是没Ip的 var consumerConfig = ConsumerConfig.valueOf(consumerModules); var vo = RegisterVO.valueOf("test", providerConfig, consumerConfig); var voStr = vo.toString(); + + // test | 127.0.0.1:80 | provider:[100-aaa-a, 120-bbb-b] | consumer:[100-aaa-random-a, 120-bbb-random-b] System.out.println(voStr); + var newVo = RegisterVO.parseString(voStr); Assert.assertEquals(vo, newVo); + // /127.0.0.1 System.out.println(NetUtil.LOCALHOST); + + // localhost/127.0.0.1 System.out.println(NetUtil.LOCALHOST4); + + //localhost/0:0:0:0:0:0:0:1 System.out.println(NetUtil.LOCALHOST6); + + // 200 System.out.println(NetUtil.SOMAXCONN); + + // name:lo (Software Loopback Interface 1) System.out.println(NetUtil.LOOPBACK_IF); } } + +/* +test | 127.0.0.1:80 | provider:[100-aaa-a, 120-bbb-b] | consumer:[100-aaa-random-a, 120-bbb-random-b] +/127.0.0.1 +localhost/127.0.0.1 +localhost/0:0:0:0:0:0:0:1 +200 +name:lo (Software Loopback Interface 1) + */ \ No newline at end of file diff --git a/net/src/test/java/com/zfoo/net/core/tcpAsync/client/TcpClientTest.java b/net/src/test/java/com/zfoo/net/core/tcpAsync/client/TcpClientTest.java index 19db30cd..c8a2f89a 100644 --- a/net/src/test/java/com/zfoo/net/core/tcpAsync/client/TcpClientTest.java +++ b/net/src/test/java/com/zfoo/net/core/tcpAsync/client/TcpClientTest.java @@ -40,12 +40,19 @@ public class TcpClientTest { var context = new ClassPathXmlApplicationContext("config.xml"); var client = new TcpClient(HostAndPort.valueOf("127.0.0.1:9000")); + + // 得到连接到服务器的session var session = client.start(); for (int i = 0; i < 1000; i++) { var ask = new AsyncMessAsk(); ask.setMessage("Hello, this is async client!"); + // Router只是一个接口,它的赋值也是在容器初始化好之后从Spring容器获取到的Router对象 + // 要发送网络数据,要通过方法asyncAsk,第一个参数就是要发送消息的session + // 注意:第3个argument参数是当收到消息后,是要要到哪个线程处理,这是CompletableFuture保证的 + // 其实这个依然是通过session,然后调用send发送过去的消息 + // 最终依然是通过BaseRouteHandler得到响应,只不过attachment不为空,会触发CompletableFuture的completable方法,从而得到回调结果 NetContext.getRouter().asyncAsk(session, ask, AsyncMessAnswer.class, null) .whenComplete(answer -> { logger.info("异步请求收到结果[{}]", JsonUtils.object2String(answer)); diff --git a/net/src/test/java/com/zfoo/net/core/tcpAsync/server/TcpServerTest.java b/net/src/test/java/com/zfoo/net/core/tcpAsync/server/TcpServerTest.java index c6f9f581..a4133514 100644 --- a/net/src/test/java/com/zfoo/net/core/tcpAsync/server/TcpServerTest.java +++ b/net/src/test/java/com/zfoo/net/core/tcpAsync/server/TcpServerTest.java @@ -29,12 +29,53 @@ public class TcpServerTest { @Test public void startServer() { + // 启动流程笔记: + // 首先,这里是xml配置,这样子就会解析xml配置中的标签 + // 当遇到自定义标签后,就会触发NamespaceHandler 和 NetDefinitionParser 的自定义标签解析流程 + // 接着就会往Spring容器注册相关的各种bean var context = new ClassPathXmlApplicationContext("config.xml"); + // 打印下当前spring管理的bean信息 + // 可以得出结论:通过解析自定义标签,往spring容器中自己主动注册的几个 + // 和网络有关的全类名的bean(NetConfig、NetContext、ConfigManager、PacketService、Router、Consumer、SessionManager) + String[] beanNames = context.getBeanDefinitionNames(); + for (String beanName : beanNames) { + System.out.println(beanName); + } + + // 启动起来一个Tcp服务器 + // 注意:由于之前往Spring容器中注册的SessionManager,从而可以使用这个bean变量管理所有的连接信息 + // 这也是spring的好处:很多东西是解耦的,让SessionManager的初始化交给的是spring,而不是自己从main函数开始一个个组件去初始化 var server = new TcpServer(HostAndPort.valueOf("127.0.0.1:9000")); server.start(); ThreadUtils.sleep(Long.MAX_VALUE); } - } + +/* +serverPacketController +gatewayProviderController +JProtobufTcpClientController +JProtobufTcpController +providerController +tcpClientController +tcpServerController +tcpAsyncController +tcpSyncController +udpClientPacketController +udpServerPacketController +websocketClientPacketController +websocketServerPacketController +org.springframework.context.annotation.internalConfigurationAnnotationProcessor +org.springframework.context.annotation.internalAutowiredAnnotationProcessor +org.springframework.context.event.internalEventListenerProcessor +org.springframework.context.event.internalEventListenerFactory +com.zfoo.net.config.model.NetConfig +com.zfoo.net.NetContext +com.zfoo.net.config.manager.ConfigManager +com.zfoo.net.packet.service.PacketService +com.zfoo.net.router.Router +com.zfoo.net.consumer.Consumer +com.zfoo.net.session.manager.SessionManager + */ diff --git a/protocol/src/main/java/com/zfoo/protocol/registration/ProtocolModule.java b/protocol/src/main/java/com/zfoo/protocol/registration/ProtocolModule.java index 2d468d96..d7f57e1f 100644 --- a/protocol/src/main/java/com/zfoo/protocol/registration/ProtocolModule.java +++ b/protocol/src/main/java/com/zfoo/protocol/registration/ProtocolModule.java @@ -16,6 +16,8 @@ package com.zfoo.protocol.registration; import com.zfoo.protocol.util.StringUtils; /** + * 模块id和模块名的封装 + * * @author jaysunxiao * @version 3.0 */ From 64e169d816a51277ca32de1eb8cdfce890a831a0 Mon Sep 17 00:00:00 2001 From: jianan <1072772483@qq.com> Date: Thu, 7 Jul 2022 01:37:10 +0800 Subject: [PATCH 2/2] =?UTF-8?q?perf[module]:=20GatewayTest=E5=90=AF?= =?UTF-8?q?=E5=8A=A8=E6=8A=A5=E9=94=99=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gateway/gateway_consistent_session_config.xml | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/net/src/test/resources/gateway/gateway_consistent_session_config.xml b/net/src/test/resources/gateway/gateway_consistent_session_config.xml index 28e5eb49..3c2f1fd6 100644 --- a/net/src/test/resources/gateway/gateway_consistent_session_config.xml +++ b/net/src/test/resources/gateway/gateway_consistent_session_config.xml @@ -24,9 +24,13 @@ - - - + + + + + + +