ref[net]: json tcp server

This commit is contained in:
godotg
2023-09-10 15:01:42 +08:00
parent 43872ec804
commit f398ecf5b9
3 changed files with 61 additions and 3 deletions
@@ -13,6 +13,7 @@
package com.zfoo.net.router.attachment;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.zfoo.protocol.anno.Protocol;
import com.zfoo.scheduler.util.TimeUtils;
@@ -24,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* @version 3.0
*/
@Protocol(id = 0)
@JsonIgnoreProperties("responseFuture")
public class SignalAttachment {
/**
@@ -29,6 +29,7 @@ import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
/**
* 这是客户端连接网关,网关转发到服务提供者的测试用例
@@ -88,7 +89,7 @@ public class GatewayTest {
* 这里是客户端,客户端先请求数据到到网关(毕竟自己连接的就是网关)
*/
@Test
public void clientTest() {
public void clientSyncTest() {
var context = new ClassPathXmlApplicationContext("gateway/gateway_client_config.xml");
SessionUtils.printSessionInfo();
@@ -122,4 +123,40 @@ public class GatewayTest {
ThreadUtils.sleep(Long.MAX_VALUE);
}
@Test
public void clientAsyncTest() {
var context = new ClassPathXmlApplicationContext("gateway/gateway_client_config.xml");
SessionUtils.printSessionInfo();
// 这里的地址是网关的地址
var client = new TcpClient(HostAndPort.valueOf("127.0.0.1:9000"));
var session = client.start();
var executorSize = Runtime.getRuntime().availableProcessors() * 2;
var executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
var request = new GatewayToProviderRequest();
request.setMessage("Hello, this is the client!");
var atomicInteger = new AtomicInteger(0);
for (int i = 0; i < executorSize; i++) {
var thread = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
try {
// 注意:这里的第2个请求参数是 xxxRequest,不是xxxAsk。 因为这里是网关要将数据转发给Provider的,因此当然不能是xxxAsk这种请求。
// 第3个参数argument是null,这样子随机一个服务提供者进行消息处理
NetContext.getRouter().asyncAsk(session, request, GatewayToProviderResponse.class, null)
.whenComplete(response -> {
logger.info("客户端请求[{}]收到消息[{}]", atomicInteger.incrementAndGet(), JsonUtils.object2String(response));
});
} catch (Exception e) {
e.printStackTrace();
}
}
});
executor.execute(thread);
}
ThreadUtils.sleep(Long.MAX_VALUE);
}
}
@@ -15,14 +15,21 @@ package com.zfoo.net.core.json.client;
import com.zfoo.net.NetContext;
import com.zfoo.net.core.HostAndPort;
import com.zfoo.net.core.jprotobuf.server.JProtobufTcpController;
import com.zfoo.net.core.json.JsonWebsocketClient;
import com.zfoo.net.packet.json.JsonHelloRequest;
import com.zfoo.net.packet.json.JsonHelloResponse;
import com.zfoo.protocol.util.JsonUtils;
import com.zfoo.protocol.util.ThreadUtils;
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolConfig;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.function.Consumer;
/**
* @author godotg
* @version 3.0
@@ -30,8 +37,10 @@ import org.springframework.context.support.ClassPathXmlApplicationContext;
@Ignore
public class JsonWebsocketClientTest {
private static final Logger logger = LoggerFactory.getLogger(JsonWebsocketClientTest.class);
@Test
public void startClient() {
public void startClient() throws Exception {
var context = new ClassPathXmlApplicationContext("config.xml");
var webSocketClientProtocolConfig = WebSocketClientProtocolConfig.newBuilder()
@@ -45,8 +54,18 @@ public class JsonWebsocketClientTest {
request.setMessage("Hello, this is the json client!");
for (int i = 0; i < 1000; i++) {
ThreadUtils.sleep(2000);
ThreadUtils.sleep(1000);
NetContext.getRouter().send(session, request);
ThreadUtils.sleep(1000);
var response = NetContext.getRouter().syncAsk(session, request, JsonHelloResponse.class, session.getSid()).packet();
logger.info("sync json client receive [packet:{}] from server", JsonUtils.object2String(response));
NetContext.getRouter().asyncAsk(session, request, JsonHelloResponse.class, session.getSid())
.whenComplete(new Consumer<JsonHelloResponse>() {
@Override
public void accept(JsonHelloResponse jsonHelloResponse) {
logger.info("async json client receive [packet:{}] from server", JsonUtils.object2String(jsonHelloResponse));
}
});
}
ThreadUtils.sleep(Long.MAX_VALUE);