From f398ecf5b90f1176ee96e67343b19f1ca64adf97 Mon Sep 17 00:00:00 2001 From: godotg Date: Sun, 10 Sep 2023 15:01:42 +0800 Subject: [PATCH] ref[net]: json tcp server --- .../router/attachment/SignalAttachment.java | 2 + .../zfoo/net/core/gateway/GatewayTest.java | 39 ++++++++++++++++++- .../json/client/JsonWebsocketClientTest.java | 23 ++++++++++- 3 files changed, 61 insertions(+), 3 deletions(-) diff --git a/net/src/main/java/com/zfoo/net/router/attachment/SignalAttachment.java b/net/src/main/java/com/zfoo/net/router/attachment/SignalAttachment.java index 015d2dff..4692f96d 100644 --- a/net/src/main/java/com/zfoo/net/router/attachment/SignalAttachment.java +++ b/net/src/main/java/com/zfoo/net/router/attachment/SignalAttachment.java @@ -13,6 +13,7 @@ package com.zfoo.net.router.attachment; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.zfoo.protocol.anno.Protocol; import com.zfoo.scheduler.util.TimeUtils; @@ -24,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger; * @version 3.0 */ @Protocol(id = 0) +@JsonIgnoreProperties("responseFuture") public class SignalAttachment { /** diff --git a/net/src/test/java/com/zfoo/net/core/gateway/GatewayTest.java b/net/src/test/java/com/zfoo/net/core/gateway/GatewayTest.java index 6cc11acb..ad0bd96e 100644 --- a/net/src/test/java/com/zfoo/net/core/gateway/GatewayTest.java +++ b/net/src/test/java/com/zfoo/net/core/gateway/GatewayTest.java @@ -29,6 +29,7 @@ import org.springframework.context.support.ClassPathXmlApplicationContext; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; /** * 这是客户端连接网关,网关转发到服务提供者的测试用例 @@ -88,7 +89,7 @@ public class GatewayTest { * 这里是客户端,客户端先请求数据到到网关(毕竟自己连接的就是网关) */ @Test - public void clientTest() { + public void clientSyncTest() { var context = new ClassPathXmlApplicationContext("gateway/gateway_client_config.xml"); SessionUtils.printSessionInfo(); @@ -122,4 +123,40 @@ public class GatewayTest { ThreadUtils.sleep(Long.MAX_VALUE); } + @Test + public void clientAsyncTest() { + var context = new ClassPathXmlApplicationContext("gateway/gateway_client_config.xml"); + SessionUtils.printSessionInfo(); + + // 这里的地址是网关的地址 + var client = new TcpClient(HostAndPort.valueOf("127.0.0.1:9000")); + var session = client.start(); + + var executorSize = Runtime.getRuntime().availableProcessors() * 2; + var executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + + var request = new GatewayToProviderRequest(); + request.setMessage("Hello, this is the client!"); + var atomicInteger = new AtomicInteger(0); + + for (int i = 0; i < executorSize; i++) { + var thread = new Thread(() -> { + for (int j = 0; j < 1000; j++) { + try { + // 注意:这里的第2个请求参数是 xxxRequest,不是xxxAsk。 因为这里是网关要将数据转发给Provider的,因此当然不能是xxxAsk这种请求。 + // 第3个参数argument是null,这样子随机一个服务提供者进行消息处理 + NetContext.getRouter().asyncAsk(session, request, GatewayToProviderResponse.class, null) + .whenComplete(response -> { + logger.info("客户端请求[{}]收到消息[{}]", atomicInteger.incrementAndGet(), JsonUtils.object2String(response)); + }); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + executor.execute(thread); + } + + ThreadUtils.sleep(Long.MAX_VALUE); + } } diff --git a/net/src/test/java/com/zfoo/net/core/json/client/JsonWebsocketClientTest.java b/net/src/test/java/com/zfoo/net/core/json/client/JsonWebsocketClientTest.java index 4a7d6656..c73ca95c 100644 --- a/net/src/test/java/com/zfoo/net/core/json/client/JsonWebsocketClientTest.java +++ b/net/src/test/java/com/zfoo/net/core/json/client/JsonWebsocketClientTest.java @@ -15,14 +15,21 @@ package com.zfoo.net.core.json.client; import com.zfoo.net.NetContext; import com.zfoo.net.core.HostAndPort; +import com.zfoo.net.core.jprotobuf.server.JProtobufTcpController; import com.zfoo.net.core.json.JsonWebsocketClient; import com.zfoo.net.packet.json.JsonHelloRequest; +import com.zfoo.net.packet.json.JsonHelloResponse; +import com.zfoo.protocol.util.JsonUtils; import com.zfoo.protocol.util.ThreadUtils; import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolConfig; import org.junit.Ignore; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.context.support.ClassPathXmlApplicationContext; +import java.util.function.Consumer; + /** * @author godotg * @version 3.0 @@ -30,8 +37,10 @@ import org.springframework.context.support.ClassPathXmlApplicationContext; @Ignore public class JsonWebsocketClientTest { + private static final Logger logger = LoggerFactory.getLogger(JsonWebsocketClientTest.class); + @Test - public void startClient() { + public void startClient() throws Exception { var context = new ClassPathXmlApplicationContext("config.xml"); var webSocketClientProtocolConfig = WebSocketClientProtocolConfig.newBuilder() @@ -45,8 +54,18 @@ public class JsonWebsocketClientTest { request.setMessage("Hello, this is the json client!"); for (int i = 0; i < 1000; i++) { - ThreadUtils.sleep(2000); + ThreadUtils.sleep(1000); NetContext.getRouter().send(session, request); + ThreadUtils.sleep(1000); + var response = NetContext.getRouter().syncAsk(session, request, JsonHelloResponse.class, session.getSid()).packet(); + logger.info("sync json client receive [packet:{}] from server", JsonUtils.object2String(response)); + NetContext.getRouter().asyncAsk(session, request, JsonHelloResponse.class, session.getSid()) + .whenComplete(new Consumer() { + @Override + public void accept(JsonHelloResponse jsonHelloResponse) { + logger.info("async json client receive [packet:{}] from server", JsonUtils.object2String(jsonHelloResponse)); + } + }); } ThreadUtils.sleep(Long.MAX_VALUE);