ref[net]: 重构net

This commit is contained in:
jaysunxiao
2021-10-13 23:24:33 +08:00
parent e8aab562aa
commit 526a59d88b
46 changed files with 153 additions and 150 deletions
@@ -15,9 +15,9 @@ package com.zfoo.boot;
import com.zfoo.net.NetContext;
import com.zfoo.net.config.manager.ConfigManager;
import com.zfoo.net.config.model.NetConfig;
import com.zfoo.net.consumer.service.Consumer;
import com.zfoo.net.dispatcher.manager.PacketDispatcher;
import com.zfoo.net.consumer.Consumer;
import com.zfoo.net.packet.service.PacketService;
import com.zfoo.net.router.Router;
import com.zfoo.net.session.manager.SessionManager;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@@ -48,8 +48,8 @@ public class NetAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public PacketDispatcher packetDispatcher() {
return new PacketDispatcher();
public Router packetDispatcher() {
return new Router();
}
@Bean
+15 -14
View File
@@ -14,11 +14,11 @@
package com.zfoo.net;
import com.zfoo.net.config.manager.IConfigManager;
import com.zfoo.net.consumer.service.IConsumer;
import com.zfoo.net.consumer.IConsumer;
import com.zfoo.net.core.AbstractClient;
import com.zfoo.net.core.AbstractServer;
import com.zfoo.net.dispatcher.manager.IPacketDispatcher;
import com.zfoo.net.packet.service.IPacketService;
import com.zfoo.net.router.IRouter;
import com.zfoo.net.session.manager.ISessionManager;
import com.zfoo.net.session.model.Session;
import com.zfoo.net.task.TaskBus;
@@ -54,15 +54,16 @@ public class NetContext implements ApplicationListener<ApplicationContextEvent>,
private ApplicationContext applicationContext;
private IRouter router;
private IConsumer consumer;
private IConfigManager configManager;
private IPacketService packetService;
private IPacketDispatcher packetDispatcher;
private ISessionManager sessionManager;
private IConsumer consumer;
public static NetContext getNetContext() {
return instance;
@@ -72,6 +73,14 @@ public class NetContext implements ApplicationListener<ApplicationContextEvent>,
return instance.applicationContext;
}
public static IRouter getRouter() {
return instance.router;
}
public static IConsumer getConsumer() {
return instance.consumer;
}
public static IConfigManager getConfigManager() {
return instance.configManager;
}
@@ -84,14 +93,6 @@ public class NetContext implements ApplicationListener<ApplicationContextEvent>,
return instance.sessionManager;
}
public static IPacketDispatcher getDispatcher() {
return instance.packetDispatcher;
}
public static IConsumer getConsumer() {
return instance.consumer;
}
@Override
public void onApplicationEvent(ApplicationContextEvent event) {
if (event instanceof ContextRefreshedEvent) {
@@ -100,7 +101,7 @@ public class NetContext implements ApplicationListener<ApplicationContextEvent>,
instance.applicationContext = event.getApplicationContext();
instance.configManager = applicationContext.getBean(IConfigManager.class);
instance.packetService = applicationContext.getBean(IPacketService.class);
instance.packetDispatcher = applicationContext.getBean(IPacketDispatcher.class);
instance.router = applicationContext.getBean(IRouter.class);
instance.consumer = applicationContext.getBean(IConsumer.class);
instance.sessionManager = applicationContext.getBean(ISessionManager.class);
@@ -1,6 +1,5 @@
/*
* Copyright (C) 2020 The zfoo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
@@ -9,21 +8,22 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
*/
package com.zfoo.net.consumer.service;
package com.zfoo.net.consumer;
import com.zfoo.net.NetContext;
import com.zfoo.net.dispatcher.manager.PacketDispatcher;
import com.zfoo.net.dispatcher.manager.PacketSignal;
import com.zfoo.net.dispatcher.model.answer.AsyncAnswer;
import com.zfoo.net.dispatcher.model.answer.SyncAnswer;
import com.zfoo.net.dispatcher.model.exception.ErrorResponseException;
import com.zfoo.net.dispatcher.model.exception.NetTimeOutException;
import com.zfoo.net.dispatcher.model.exception.UnexpectedProtocolException;
import com.zfoo.net.packet.common.Error;
import com.zfoo.net.packet.model.NoAnswerAttachment;
import com.zfoo.net.packet.model.SignalPacketAttachment;
import com.zfoo.net.router.Router;
import com.zfoo.net.router.answer.AsyncAnswer;
import com.zfoo.net.router.answer.SyncAnswer;
import com.zfoo.net.router.exception.ErrorResponseException;
import com.zfoo.net.router.exception.NetTimeOutException;
import com.zfoo.net.router.exception.UnexpectedProtocolException;
import com.zfoo.net.router.route.PacketSignal;
import com.zfoo.protocol.IPacket;
import com.zfoo.protocol.util.JsonUtils;
import com.zfoo.protocol.util.StringUtils;
@@ -54,7 +54,7 @@ public class Consumer implements IConsumer {
var loadBalancer = NetContext.getConfigManager().consumerLoadBalancer();
var session = loadBalancer.loadBalancer(packet, argument);
var executorConsistentHash = (argument == null) ? RandomUtils.randomInt() : HashUtils.fnvHash(argument);
NetContext.getDispatcher().send(session, packet, NoAnswerAttachment.valueOf(executorConsistentHash));
NetContext.getRouter().send(session, packet, NoAnswerAttachment.valueOf(executorConsistentHash));
} catch (Throwable t) {
logger.error("consumer发送未知异常", t);
}
@@ -77,9 +77,9 @@ public class Consumer implements IConsumer {
// load balancer之前调用
loadBalancer.beforeLoadBalancer(session, packet, clientAttachment);
NetContext.getDispatcher().send(session, packet, clientAttachment);
NetContext.getRouter().send(session, packet, clientAttachment);
IPacket responsePacket = clientAttachment.getResponseFuture().get(PacketDispatcher.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
IPacket responsePacket = clientAttachment.getResponseFuture().get(Router.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
if (responsePacket.protocolId() == Error.errorProtocolId()) {
throw new ErrorResponseException((Error) responsePacket);
@@ -105,7 +105,7 @@ public class Consumer implements IConsumer {
public <T extends IPacket> AsyncAnswer<T> asyncAsk(IPacket packet, Class<T> answerClass, Object argument) {
var loadBalancer = NetContext.getConfigManager().consumerLoadBalancer();
var session = loadBalancer.loadBalancer(packet, argument);
var asyncAnswer = NetContext.getDispatcher().asyncAsk(session, packet, answerClass, argument);
var asyncAnswer = NetContext.getRouter().asyncAsk(session, packet, answerClass, argument);
// load balancer之前调用
loadBalancer.beforeLoadBalancer(session, packet, asyncAnswer.getFutureAttachment());
@@ -1,6 +1,5 @@
/*
* Copyright (C) 2020 The zfoo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
@@ -9,12 +8,13 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
*/
package com.zfoo.net.consumer.service;
package com.zfoo.net.consumer;
import com.zfoo.net.dispatcher.model.answer.AsyncAnswer;
import com.zfoo.net.dispatcher.model.answer.SyncAnswer;
import com.zfoo.net.router.answer.AsyncAnswer;
import com.zfoo.net.router.answer.SyncAnswer;
import com.zfoo.protocol.IPacket;
import org.springframework.lang.Nullable;
@@ -65,7 +65,7 @@ public class BaseDispatcherHandler extends ChannelInboundHandlerAdapter {
return;
}
DecodedPacketInfo decodedPacketInfo = (DecodedPacketInfo) msg;
NetContext.getDispatcher().receive(session, decodedPacketInfo.getPacket(), decodedPacketInfo.getPacketAttachment());
NetContext.getRouter().receive(session, decodedPacketInfo.getPacket(), decodedPacketInfo.getPacketAttachment());
}
@Override
@@ -68,7 +68,7 @@ public class GatewayDispatcherHandler extends ServerDispatcherHandler {
return;
}
if (packet.protocolId() == Ping.pingProtocolId()) {
NetContext.getDispatcher().send(session, Pong.valueOf(TimeUtils.now()), null);
NetContext.getRouter().send(session, Pong.valueOf(TimeUtils.now()), null);
return;
}
@@ -107,7 +107,7 @@ public class GatewayDispatcherHandler extends ServerDispatcherHandler {
private void forwardingPacket(IPacket packet, IPacketAttachment attachment, Object argument) {
try {
var consumerSession = ConsistentHashConsumerLoadBalancer.getInstance().loadBalancer(packet, argument);
NetContext.getDispatcher().send(consumerSession, packet, attachment);
NetContext.getRouter().send(consumerSession, packet, attachment);
} catch (Exception e) {
logger.error("网关发生异常", e);
} catch (Throwable t) {
@@ -14,9 +14,9 @@
package com.zfoo.net.packet.service;
import com.zfoo.net.NetContext;
import com.zfoo.net.dispatcher.manager.PacketBus;
import com.zfoo.net.packet.model.DecodedPacketInfo;
import com.zfoo.net.packet.model.IPacketAttachment;
import com.zfoo.net.router.route.PacketRoute;
import com.zfoo.protocol.IPacket;
import com.zfoo.protocol.ProtocolManager;
import com.zfoo.protocol.buffer.ByteBufUtils;
@@ -110,7 +110,7 @@ public class PacketService implements IPacketService {
var beanNames = applicationContext.getBeanDefinitionNames();
for (var beanName : beanNames) {
var bean = applicationContext.getBean(beanName);
PacketBus.registerPacketReceiverDefinition(bean);
PacketRoute.registerPacketReceiverDefinition(bean);
}
}
@@ -1,6 +1,5 @@
/*
* Copyright (C) 2020 The zfoo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
@@ -9,13 +8,14 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
*/
package com.zfoo.net.dispatcher.manager;
package com.zfoo.net.router;
import com.zfoo.net.dispatcher.model.answer.AsyncAnswer;
import com.zfoo.net.dispatcher.model.answer.SyncAnswer;
import com.zfoo.net.packet.model.IPacketAttachment;
import com.zfoo.net.router.answer.AsyncAnswer;
import com.zfoo.net.router.answer.SyncAnswer;
import com.zfoo.net.session.model.Session;
import com.zfoo.protocol.IPacket;
import org.springframework.lang.Nullable;
@@ -25,7 +25,7 @@ import org.springframework.lang.Nullable;
* @author jaysunxiao
* @version 3.0
*/
public interface IPacketDispatcher {
public interface IRouter {
void send(Session session, IPacket packet);
@@ -1,6 +1,5 @@
/*
* Copyright (C) 2020 The zfoo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
@@ -9,30 +8,33 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
*/
package com.zfoo.net.dispatcher.manager;
package com.zfoo.net.router;
import com.zfoo.event.manager.EventBus;
import com.zfoo.net.NetContext;
import com.zfoo.net.core.gateway.model.AuthUidToGatewayCheck;
import com.zfoo.net.core.gateway.model.AuthUidToGatewayConfirm;
import com.zfoo.net.core.gateway.model.AuthUidToGatewayEvent;
import com.zfoo.net.dispatcher.model.answer.AsyncAnswer;
import com.zfoo.net.dispatcher.model.answer.SyncAnswer;
import com.zfoo.net.dispatcher.model.exception.ErrorResponseException;
import com.zfoo.net.dispatcher.model.exception.NetTimeOutException;
import com.zfoo.net.dispatcher.model.exception.UnexpectedProtocolException;
import com.zfoo.net.packet.common.Error;
import com.zfoo.net.packet.common.Heartbeat;
import com.zfoo.net.packet.model.EncodedPacketInfo;
import com.zfoo.net.packet.model.GatewayPacketAttachment;
import com.zfoo.net.packet.model.IPacketAttachment;
import com.zfoo.net.packet.model.SignalPacketAttachment;
import com.zfoo.net.router.answer.AsyncAnswer;
import com.zfoo.net.router.answer.SyncAnswer;
import com.zfoo.net.router.exception.ErrorResponseException;
import com.zfoo.net.router.exception.NetTimeOutException;
import com.zfoo.net.router.exception.UnexpectedProtocolException;
import com.zfoo.net.router.route.PacketRoute;
import com.zfoo.net.router.route.PacketSignal;
import com.zfoo.net.session.model.AttributeType;
import com.zfoo.net.session.model.Session;
import com.zfoo.net.task.TaskBus;
import com.zfoo.net.task.model.ReceiveTask;
import com.zfoo.net.task.model.PacketReceiverTask;
import com.zfoo.protocol.IPacket;
import com.zfoo.protocol.exception.ExceptionUtils;
import com.zfoo.protocol.util.JsonUtils;
@@ -53,9 +55,9 @@ import java.util.concurrent.TimeoutException;
* @author jaysunxiao
* @version 3.0
*/
public class PacketDispatcher implements IPacketDispatcher {
public class Router implements IRouter {
private static final Logger logger = LoggerFactory.getLogger(PacketDispatcher.class);
private static final Logger logger = LoggerFactory.getLogger(Router.class);
public static final long DEFAULT_TIMEOUT = 3000;
@@ -118,7 +120,7 @@ public class PacketDispatcher implements IPacketDispatcher {
gatewaySession.putAttribute(AttributeType.UID, uid);
EventBus.asyncSubmit(AuthUidToGatewayEvent.valueOf(gatewaySession.getSid(), uid));
NetContext.getDispatcher().send(session, AuthUidToGatewayConfirm.valueOf(uid), new GatewayPacketAttachment(gatewaySession, null));
NetContext.getRouter().send(session, AuthUidToGatewayConfirm.valueOf(uid), new GatewayPacketAttachment(gatewaySession, null));
return;
}
send(gatewaySession, packet, signalAttachment);
@@ -136,7 +138,7 @@ public class PacketDispatcher implements IPacketDispatcher {
}
// 正常发送消息的接收
TaskBus.submit(new ReceiveTask(session, packet, packetAttachment));
TaskBus.submit(new PacketReceiverTask(session, packet, packetAttachment));
}
@Override
@@ -301,7 +303,7 @@ public class PacketDispatcher implements IPacketDispatcher {
}
// 调用PacketReceiver
PacketBus.submit(session, packet, packetAttachment);
PacketRoute.submit(session, packet, packetAttachment);
} catch (Exception e) {
logger.error(StringUtils.format("e[uid:{}][sid:{}]未知exception异常", session.getAttribute(AttributeType.UID), session.getSid(), e.getMessage()), e);
} catch (Throwable t) {
@@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.zfoo.net.dispatcher.model.answer;
package com.zfoo.net.router.answer;
import com.zfoo.net.packet.model.SignalPacketAttachment;
import com.zfoo.net.task.model.SafeRunnable;
@@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.zfoo.net.dispatcher.model.answer;
package com.zfoo.net.router.answer;
import com.zfoo.net.task.model.SafeRunnable;
import com.zfoo.protocol.IPacket;
@@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.zfoo.net.dispatcher.model.answer;
package com.zfoo.net.router.answer;
import com.zfoo.net.packet.model.SignalPacketAttachment;
import com.zfoo.protocol.IPacket;
@@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.zfoo.net.dispatcher.model.answer;
package com.zfoo.net.router.answer;
import com.zfoo.net.packet.model.SignalPacketAttachment;
import com.zfoo.protocol.IPacket;
@@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.zfoo.net.dispatcher.model.exception;
package com.zfoo.net.router.exception;
import com.zfoo.net.packet.common.Error;
@@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.zfoo.net.dispatcher.model.exception;
package com.zfoo.net.router.exception;
/**
* @author jaysunxiao
@@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.zfoo.net.dispatcher.model.exception;
package com.zfoo.net.router.exception;
import com.zfoo.protocol.exception.RunException;
@@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.zfoo.net.dispatcher.model.vo;
package com.zfoo.net.router.receiver;
import com.zfoo.net.packet.model.IPacketAttachment;
import com.zfoo.net.session.model.Session;
@@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.zfoo.net.dispatcher.model.vo;
package com.zfoo.net.router.receiver;
import com.zfoo.net.packet.model.IPacketAttachment;
import com.zfoo.net.session.model.Session;
@@ -1,6 +1,5 @@
/*
* Copyright (C) 2020 The zfoo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
@@ -9,9 +8,10 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
*/
package com.zfoo.net.dispatcher.model.anno;
package com.zfoo.net.router.receiver;
import java.lang.annotation.*;
@@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.zfoo.net.dispatcher.model.vo;
package com.zfoo.net.router.receiver;
import com.zfoo.net.packet.model.IPacketAttachment;
import com.zfoo.net.session.model.Session;
@@ -11,18 +11,18 @@
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.zfoo.net.dispatcher.manager;
package com.zfoo.net.router.route;
import com.zfoo.event.model.event.IEvent;
import com.zfoo.net.dispatcher.model.anno.PacketReceiver;
import com.zfoo.net.dispatcher.model.vo.EnhanceUtils;
import com.zfoo.net.dispatcher.model.vo.IPacketReceiver;
import com.zfoo.net.dispatcher.model.vo.PacketReceiverDefinition;
import com.zfoo.net.packet.model.GatewayPacketAttachment;
import com.zfoo.net.packet.model.HttpPacketAttachment;
import com.zfoo.net.packet.model.IPacketAttachment;
import com.zfoo.net.packet.model.UdpPacketAttachment;
import com.zfoo.net.packet.service.PacketService;
import com.zfoo.net.router.receiver.EnhanceUtils;
import com.zfoo.net.router.receiver.IPacketReceiver;
import com.zfoo.net.router.receiver.PacketReceiver;
import com.zfoo.net.router.receiver.PacketReceiverDefinition;
import com.zfoo.net.session.model.Session;
import com.zfoo.protocol.IPacket;
import com.zfoo.protocol.ProtocolManager;
@@ -41,9 +41,9 @@ import java.lang.reflect.Modifier;
* @author jaysunxiao
* @version 3.0
*/
public abstract class PacketBus {
public abstract class PacketRoute {
private static final Logger logger = LoggerFactory.getLogger(PacketBus.class);
private static final Logger logger = LoggerFactory.getLogger(PacketRoute.class);
/**
* 客户端和服务端都有接受packet的方法packetReceiverList对应的就是包的接收方法将receiver注册到IProtocolRegistration
@@ -10,7 +10,7 @@
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.zfoo.net.dispatcher.manager;
package com.zfoo.net.router.route;
import com.zfoo.net.packet.model.SignalPacketAttachment;
import com.zfoo.protocol.util.JsonUtils;
@@ -16,9 +16,9 @@ package com.zfoo.net.schema;
import com.zfoo.net.NetContext;
import com.zfoo.net.config.manager.ConfigManager;
import com.zfoo.net.config.model.*;
import com.zfoo.net.consumer.service.Consumer;
import com.zfoo.net.dispatcher.manager.PacketDispatcher;
import com.zfoo.net.consumer.Consumer;
import com.zfoo.net.packet.service.PacketService;
import com.zfoo.net.router.Router;
import com.zfoo.net.session.manager.SessionManager;
import com.zfoo.protocol.registration.ProtocolModule;
import com.zfoo.protocol.util.DomUtils;
@@ -65,7 +65,7 @@ public class NetDefinitionParser implements BeanDefinitionParser {
parserContext.getRegistry().registerBeanDefinition(clazz.getCanonicalName(), builder.getBeanDefinition());
// 注册PacketDispatcherManager
clazz = PacketDispatcher.class;
clazz = Router.class;
builder = BeanDefinitionBuilder.rootBeanDefinition(clazz);
parserContext.getRegistry().registerBeanDefinition(clazz.getCanonicalName(), builder.getBeanDefinition());
@@ -14,9 +14,9 @@
package com.zfoo.net.task;
import com.zfoo.net.NetContext;
import com.zfoo.net.task.model.ReceiveTask;
import com.zfoo.net.task.route.AbstractTaskRoute;
import com.zfoo.net.task.route.ITaskRoute;
import com.zfoo.net.task.dispatcher.AbstractTaskDispatcher;
import com.zfoo.net.task.dispatcher.ITaskDispatcher;
import com.zfoo.net.task.model.PacketReceiverTask;
import com.zfoo.protocol.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +35,7 @@ public final class TaskBus {
// 线程池的大小
public static final int EXECUTOR_SIZE;
private static final ITaskRoute taskRoute;
private static final ITaskDispatcher taskRoute;
/**
@@ -52,7 +52,7 @@ public final class TaskBus {
? "default" : providerConfig.getDispatchThread();
EXECUTOR_SIZE = "default".equals(dispatchThread) ? (Runtime.getRuntime().availableProcessors() + 1) : Integer.parseInt(dispatchThread);
taskRoute = AbstractTaskRoute.valueOf(dispatch);
taskRoute = AbstractTaskDispatcher.valueOf(dispatch);
executors = new ExecutorService[EXECUTOR_SIZE];
for (int i = 0; i < executors.length; i++) {
@@ -62,7 +62,7 @@ public final class TaskBus {
}
public static void submit(ReceiveTask task) {
public static void submit(PacketReceiverTask task) {
taskRoute.getExecutor(task).execute(task);
}
@@ -11,7 +11,7 @@
*
*/
package com.zfoo.net.task.route;
package com.zfoo.net.task.dispatcher;
import com.zfoo.protocol.util.StringUtils;
@@ -19,16 +19,16 @@ import com.zfoo.protocol.util.StringUtils;
* @author jaysunxiao
* @version 3.0
*/
public abstract class AbstractTaskRoute implements ITaskRoute {
public abstract class AbstractTaskDispatcher implements ITaskDispatcher {
public static ITaskRoute valueOf(String taskDispatchName) {
public static ITaskDispatcher valueOf(String taskDispatchName) {
switch (taskDispatchName) {
case "random":
return new RandomTaskRoute();
return new RandomTaskDispatcher();
case "sessionId":
return new SessionIdTaskRoute();
return new SessionIdTaskDispatcher();
case "consistent-hash":
return new ConsistentHashTaskRoute();
return new ConsistentHashTaskDispatcher();
default:
throw new RuntimeException(StringUtils.format("没有找到对应的taskDispatch[{}]", taskDispatchName));
}
@@ -11,10 +11,10 @@
*
*/
package com.zfoo.net.task.route;
package com.zfoo.net.task.dispatcher;
import com.zfoo.net.task.TaskBus;
import com.zfoo.net.task.model.ReceiveTask;
import com.zfoo.net.task.model.PacketReceiverTask;
import java.util.concurrent.ExecutorService;
@@ -22,20 +22,20 @@ import java.util.concurrent.ExecutorService;
* @author jaysunxiao
* @version 3.0
*/
public class ConsistentHashTaskRoute extends AbstractTaskRoute {
public class ConsistentHashTaskDispatcher extends AbstractTaskDispatcher {
private static ConsistentHashTaskRoute INSTANCE = new ConsistentHashTaskRoute();
private static ConsistentHashTaskDispatcher INSTANCE = new ConsistentHashTaskDispatcher();
public static ConsistentHashTaskRoute getINSTANCE() {
public static ConsistentHashTaskDispatcher getINSTANCE() {
return INSTANCE;
}
@Override
public ExecutorService getExecutor(ReceiveTask receiveTask) {
var packetAttachment = receiveTask.getPacketAttachment();
public ExecutorService getExecutor(PacketReceiverTask packetReceiverTask) {
var packetAttachment = packetReceiverTask.getPacketAttachment();
if (packetAttachment == null) {
return SessionIdTaskRoute.getInstance().getExecutor(receiveTask);
return SessionIdTaskDispatcher.getInstance().getExecutor(packetReceiverTask);
}
return TaskBus.executor(packetAttachment.executorConsistentHash());
@@ -11,9 +11,9 @@
*
*/
package com.zfoo.net.task.route;
package com.zfoo.net.task.dispatcher;
import com.zfoo.net.task.model.ReceiveTask;
import com.zfoo.net.task.model.PacketReceiverTask;
import java.util.concurrent.ExecutorService;
@@ -21,8 +21,8 @@ import java.util.concurrent.ExecutorService;
* @author jaysunxiao
* @version 3.0
*/
public interface ITaskRoute {
public interface ITaskDispatcher {
ExecutorService getExecutor(ReceiveTask receiveTask);
ExecutorService getExecutor(PacketReceiverTask packetReceiverTask);
}
@@ -11,10 +11,10 @@
*
*/
package com.zfoo.net.task.route;
package com.zfoo.net.task.dispatcher;
import com.zfoo.net.task.TaskBus;
import com.zfoo.net.task.model.ReceiveTask;
import com.zfoo.net.task.model.PacketReceiverTask;
import java.util.concurrent.ExecutorService;
@@ -22,16 +22,16 @@ import java.util.concurrent.ExecutorService;
* @author jaysunxiao
* @version 3.0
*/
public class RandomTaskRoute extends AbstractTaskRoute {
public class RandomTaskDispatcher extends AbstractTaskDispatcher {
private static final RandomTaskRoute INSTANCE = new RandomTaskRoute();
private static final RandomTaskDispatcher INSTANCE = new RandomTaskDispatcher();
public static RandomTaskRoute getInstance() {
public static RandomTaskDispatcher getInstance() {
return INSTANCE;
}
@Override
public ExecutorService getExecutor(ReceiveTask receiveTask) {
public ExecutorService getExecutor(PacketReceiverTask packetReceiverTask) {
return TaskBus.executor(-1);
}
@@ -11,10 +11,10 @@
*
*/
package com.zfoo.net.task.route;
package com.zfoo.net.task.dispatcher;
import com.zfoo.net.task.TaskBus;
import com.zfoo.net.task.model.ReceiveTask;
import com.zfoo.net.task.model.PacketReceiverTask;
import com.zfoo.util.math.HashUtils;
import java.util.concurrent.ExecutorService;
@@ -25,17 +25,17 @@ import java.util.concurrent.ExecutorService;
* @author jaysunxiao
* @version 3.0
*/
public class SessionIdTaskRoute extends AbstractTaskRoute {
public class SessionIdTaskDispatcher extends AbstractTaskDispatcher {
private static final SessionIdTaskRoute INSTANCE = new SessionIdTaskRoute();
private static final SessionIdTaskDispatcher INSTANCE = new SessionIdTaskDispatcher();
public static SessionIdTaskRoute getInstance() {
public static SessionIdTaskDispatcher getInstance() {
return INSTANCE;
}
@Override
public ExecutorService getExecutor(ReceiveTask receiveTask) {
var session = receiveTask.getSession();
public ExecutorService getExecutor(PacketReceiverTask packetReceiverTask) {
var session = packetReceiverTask.getSession();
return TaskBus.executor(HashUtils.fnvHash(session.getSid()));
}
@@ -22,13 +22,13 @@ import com.zfoo.protocol.IPacket;
* @author jaysunxiao
* @version 3.0
*/
public final class ReceiveTask implements Runnable {
public final class PacketReceiverTask implements Runnable {
private Session session;
private IPacket packet;
private IPacketAttachment packetAttachment;
public ReceiveTask(Session session, IPacket packet, IPacketAttachment packetAttachment) {
public PacketReceiverTask(Session session, IPacket packet, IPacketAttachment packetAttachment) {
this.session = session;
this.packet = packet;
this.packetAttachment = packetAttachment;
@@ -36,7 +36,7 @@ public final class ReceiveTask implements Runnable {
@Override
public void run() {
NetContext.getDispatcher().atReceiver(session, packet, packetAttachment);
NetContext.getRouter().atReceiver(session, packet, packetAttachment);
}
public Session getSession() {
@@ -14,8 +14,8 @@
package com.zfoo.net.core.csharp;
import com.zfoo.net.NetContext;
import com.zfoo.net.dispatcher.model.anno.PacketReceiver;
import com.zfoo.net.packet.csharp.CM_CSharpRequest;
import com.zfoo.net.router.receiver.PacketReceiver;
import com.zfoo.net.session.model.Session;
import com.zfoo.protocol.util.JsonUtils;
import org.springframework.stereotype.Component;
@@ -32,7 +32,7 @@ public class ServerPacketController {
System.out.println("receive packet from client:");
System.out.println(JsonUtils.object2String(cm));
NetContext.getDispatcher().send(session, cm);
NetContext.getRouter().send(session, cm);
}
}
@@ -13,10 +13,10 @@
package com.zfoo.net.core.gateway;
import com.zfoo.net.NetContext;
import com.zfoo.net.dispatcher.model.anno.PacketReceiver;
import com.zfoo.net.packet.gateway.GatewayToProviderRequest;
import com.zfoo.net.packet.gateway.GatewayToProviderResponse;
import com.zfoo.net.packet.model.GatewayPacketAttachment;
import com.zfoo.net.router.receiver.PacketReceiver;
import com.zfoo.net.session.model.Session;
import com.zfoo.protocol.util.JsonUtils;
import com.zfoo.protocol.util.StringUtils;
@@ -40,6 +40,6 @@ public class GatewayProviderController {
var response = new GatewayToProviderResponse();
response.setMessage(StringUtils.format("Hello, this is the [provider:{}] response!", NetContext.getConfigManager().getLocalConfig().toLocalRegisterVO().toString()));
NetContext.getDispatcher().send(session, response, gatewayAttachment);
NetContext.getRouter().send(session, response, gatewayAttachment);
}
}
@@ -91,7 +91,7 @@ public class GatewayTest {
var thread = new Thread(() -> {
for (int j = 0; j < 10000; j++) {
try {
var response = NetContext.getDispatcher().syncAsk(session, request, GatewayToProviderResponse.class, null).packet();
var response = NetContext.getRouter().syncAsk(session, request, GatewayToProviderResponse.class, null).packet();
logger.info("客户端请求[{}]收到消息[{}]", atomicInteger.incrementAndGet(), JsonUtils.object2String(response));
} catch (Exception e) {
e.printStackTrace();
@@ -13,9 +13,9 @@
package com.zfoo.net.core.provider;
import com.zfoo.net.NetContext;
import com.zfoo.net.dispatcher.model.anno.PacketReceiver;
import com.zfoo.net.packet.provider.ProviderMessAnswer;
import com.zfoo.net.packet.provider.ProviderMessAsk;
import com.zfoo.net.router.receiver.PacketReceiver;
import com.zfoo.net.session.model.Session;
import com.zfoo.protocol.util.JsonUtils;
import com.zfoo.protocol.util.StringUtils;
@@ -39,6 +39,6 @@ public class ProviderController {
var response = new ProviderMessAnswer();
response.setMessage(StringUtils.format("Hello, this is the [provider:{}] answer!", NetContext.getConfigManager().getLocalConfig().toLocalRegisterVO().toString()));
NetContext.getDispatcher().send(session, response);
NetContext.getRouter().send(session, response);
}
}
@@ -13,8 +13,8 @@
package com.zfoo.net.core.tcp.client;
import com.zfoo.net.dispatcher.model.anno.PacketReceiver;
import com.zfoo.net.packet.tcp.TcpHelloResponse;
import com.zfoo.net.router.receiver.PacketReceiver;
import com.zfoo.net.session.model.Session;
import com.zfoo.protocol.util.JsonUtils;
import org.slf4j.Logger;
@@ -53,7 +53,7 @@ public class TcpClientTest {
for (int i = 0; i < 1000; i++) {
ThreadUtils.sleep(2000);
NetContext.getDispatcher().send(session, request);
NetContext.getRouter().send(session, request);
}
ThreadUtils.sleep(Long.MAX_VALUE);
@@ -77,7 +77,7 @@ public class TcpClientTest {
for (int j = 0; j < 10000; j++) {
var ask = new SyncMessAsk();
ask.setMessage("Hello, this is sync client!");
var answer = NetContext.getDispatcher().syncAsk(session, ask, SyncMessAnswer.class, null).packet();
var answer = NetContext.getRouter().syncAsk(session, ask, SyncMessAnswer.class, null).packet();
logger.info("同步请求[{}]收到结果[{}]", atomicInteger.incrementAndGet(), JsonUtils.object2String(answer));
}
} catch (Exception e) {
@@ -106,7 +106,7 @@ public class TcpClientTest {
var ask = new AsyncMess0Ask();
ask.setMessage("Hello, client0 -> server0!");
NetContext.getDispatcher().asyncAsk(null, ask, AsyncMess0Answer.class, null)
NetContext.getRouter().asyncAsk(null, ask, AsyncMess0Answer.class, null)
.notComplete(new SafeRunnable() {
@Override
public void doRun() {
@@ -13,8 +13,8 @@
package com.zfoo.net.core.tcp.server;
import com.zfoo.net.NetContext;
import com.zfoo.net.dispatcher.model.anno.PacketReceiver;
import com.zfoo.net.packet.tcp.*;
import com.zfoo.net.router.receiver.PacketReceiver;
import com.zfoo.net.session.model.Session;
import com.zfoo.protocol.util.JsonUtils;
import org.slf4j.Logger;
@@ -37,7 +37,7 @@ public class TcpServerPacketController {
var response = new TcpHelloResponse();
response.setMessage("Hello, this is the tcp server!");
NetContext.getDispatcher().send(session, response);
NetContext.getRouter().send(session, response);
}
@PacketReceiver
@@ -57,7 +57,7 @@ public class TcpServerPacketController {
// 测试错误返回
// var sm = ErrorResponse.valueOf(1, 1, "this is error response");
NetContext.getDispatcher().send(session, answer);
NetContext.getRouter().send(session, answer);
}
@@ -68,13 +68,13 @@ public class TcpServerPacketController {
ask1.setMessage("Hello, server0 -> server1");
var client0 = NetContext.getSessionManager().getClientSession(0L);
NetContext.getDispatcher().asyncAsk(client0, ask1, AsyncMess1Answer.class, null)
NetContext.getRouter().asyncAsk(client0, ask1, AsyncMess1Answer.class, null)
.whenComplete(sm_asyncMess0 -> {
var answer = new AsyncMess0Answer();
answer.setMessage("Hello, server1 -> client!");
NetContext.getDispatcher().send(session, answer);
NetContext.getRouter().send(session, answer);
});
}
@@ -93,7 +93,7 @@ public class TcpServerPacketController {
// 测试错误返回
// var sm = ErrorResponse.valueOf(1, 1, "this is error response");
NetContext.getDispatcher().send(session, answer);
NetContext.getRouter().send(session, answer);
}
}
@@ -12,9 +12,9 @@
package com.zfoo.net.core.udp.client;
import com.zfoo.net.dispatcher.model.anno.PacketReceiver;
import com.zfoo.net.packet.model.UdpPacketAttachment;
import com.zfoo.net.packet.udp.UdpHelloResponse;
import com.zfoo.net.router.receiver.PacketReceiver;
import com.zfoo.net.session.model.Session;
import com.zfoo.protocol.util.JsonUtils;
import org.slf4j.Logger;
@@ -40,7 +40,7 @@ public class UdpClientTest {
var request = new UdpHelloRequest();
request.setMessage("Hello, this is the udp client!");
NetContext.getDispatcher().send(session, request, UdpPacketAttachment.valueOf(hostAndPort.getHost(), hostAndPort.getPort()));
NetContext.getRouter().send(session, request, UdpPacketAttachment.valueOf(hostAndPort.getHost(), hostAndPort.getPort()));
ThreadUtils.sleep(Long.MAX_VALUE);
}
@@ -13,10 +13,10 @@
package com.zfoo.net.core.udp.server;
import com.zfoo.net.NetContext;
import com.zfoo.net.dispatcher.model.anno.PacketReceiver;
import com.zfoo.net.packet.model.UdpPacketAttachment;
import com.zfoo.net.packet.udp.UdpHelloRequest;
import com.zfoo.net.packet.udp.UdpHelloResponse;
import com.zfoo.net.router.receiver.PacketReceiver;
import com.zfoo.net.session.model.Session;
import com.zfoo.protocol.util.JsonUtils;
import org.slf4j.Logger;
@@ -39,7 +39,7 @@ public class UdpServerPacketController {
var response = new UdpHelloResponse();
response.setMessage("Hello, this is the udp server!");
NetContext.getDispatcher().send(session, response, attachment);
NetContext.getRouter().send(session, response, attachment);
}
}
@@ -13,8 +13,8 @@
package com.zfoo.net.core.websocket.client;
import com.zfoo.net.dispatcher.model.anno.PacketReceiver;
import com.zfoo.net.packet.websocket.WebsocketHelloResponse;
import com.zfoo.net.router.receiver.PacketReceiver;
import com.zfoo.net.session.model.Session;
import com.zfoo.protocol.util.JsonUtils;
import org.slf4j.Logger;
@@ -46,7 +46,7 @@ public class WebsocketClientTest {
for (int i = 0; i < 1000; i++) {
ThreadUtils.sleep(2000);
NetContext.getDispatcher().send(session, request);
NetContext.getRouter().send(session, request);
}
ThreadUtils.sleep(Long.MAX_VALUE);
@@ -13,9 +13,9 @@
package com.zfoo.net.core.websocket.server;
import com.zfoo.net.NetContext;
import com.zfoo.net.dispatcher.model.anno.PacketReceiver;
import com.zfoo.net.packet.websocket.WebsocketHelloRequest;
import com.zfoo.net.packet.websocket.WebsocketHelloResponse;
import com.zfoo.net.router.receiver.PacketReceiver;
import com.zfoo.net.session.model.Session;
import com.zfoo.protocol.util.JsonUtils;
import org.slf4j.Logger;
@@ -38,7 +38,7 @@ public class WebsocketServerPacketController {
var response = new WebsocketHelloResponse();
response.setMessage("Hello, this is the websocket server!");
NetContext.getDispatcher().send(session, response);
NetContext.getRouter().send(session, response);
}
}
@@ -10,7 +10,7 @@
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.zfoo.net.dispatcher;
package com.zfoo.net.router;
import com.zfoo.protocol.util.JsonUtils;
import org.slf4j.Logger;
@@ -10,7 +10,7 @@
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.zfoo.net.dispatcher;
package com.zfoo.net.router;
import com.zfoo.protocol.util.JsonUtils;
import org.slf4j.Logger;
@@ -10,10 +10,10 @@
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.zfoo.net.dispatcher;
package com.zfoo.net.router;
import com.zfoo.event.manager.EventBus;
import com.zfoo.net.dispatcher.manager.PacketSignal;
import com.zfoo.net.router.route.PacketSignal;
import com.zfoo.scheduler.util.TimeUtils;
import com.zfoo.util.ThreadUtils;
import org.junit.Ignore;