diff --git a/net/src/main/java/com/zfoo/net/core/gateway/GatewayServer.java b/net/src/main/java/com/zfoo/net/core/gateway/GatewayServer.java index 116c92a7..8ce51103 100644 --- a/net/src/main/java/com/zfoo/net/core/gateway/GatewayServer.java +++ b/net/src/main/java/com/zfoo/net/core/gateway/GatewayServer.java @@ -24,7 +24,6 @@ import io.netty.channel.socket.SocketChannel; import io.netty.handler.timeout.IdleStateHandler; import org.springframework.lang.Nullable; -import java.util.Objects; import java.util.function.BiFunction; /** @@ -33,15 +32,11 @@ import java.util.function.BiFunction; */ public class GatewayServer extends AbstractServer { - private final GatewayRouteHandler gatewayRouteHandler; + private BiFunction packetFilter; public GatewayServer(HostAndPort host, @Nullable BiFunction packetFilter) { - this(host, packetFilter, null); - } - - public GatewayServer(HostAndPort host, @Nullable BiFunction packetFilter, @Nullable GatewayRouteHandler gatewayRouteHandler) { super(host); - this.gatewayRouteHandler = Objects.requireNonNullElse(gatewayRouteHandler, new GatewayRouteHandler(packetFilter)); + this.packetFilter = packetFilter; } @Override @@ -49,6 +44,6 @@ public class GatewayServer extends AbstractServer { channel.pipeline().addLast(new IdleStateHandler(0, 0, 180)); channel.pipeline().addLast(new ServerIdleHandler()); channel.pipeline().addLast(new TcpCodecHandler()); - channel.pipeline().addLast(gatewayRouteHandler); + channel.pipeline().addLast(new GatewayRouteHandler(packetFilter)); } } diff --git a/net/src/main/java/com/zfoo/net/core/gateway/WebsocketGatewayServer.java b/net/src/main/java/com/zfoo/net/core/gateway/WebsocketGatewayServer.java index 67dfc809..21dab091 100644 --- a/net/src/main/java/com/zfoo/net/core/gateway/WebsocketGatewayServer.java +++ b/net/src/main/java/com/zfoo/net/core/gateway/WebsocketGatewayServer.java @@ -29,7 +29,6 @@ import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateHandler; import org.springframework.lang.Nullable; -import java.util.Objects; import java.util.function.BiFunction; /** @@ -38,18 +37,13 @@ import java.util.function.BiFunction; */ public class WebsocketGatewayServer extends AbstractServer { - private final GatewayRouteHandler gatewayRouteHandler; + private BiFunction packetFilter; public WebsocketGatewayServer(HostAndPort host, @Nullable BiFunction packetFilter) { - this(host, packetFilter, null); - } - - public WebsocketGatewayServer(HostAndPort host, @Nullable BiFunction packetFilter, @Nullable GatewayRouteHandler gatewayRouteHandler) { super(host); - this.gatewayRouteHandler = Objects.requireNonNullElse(gatewayRouteHandler, new GatewayRouteHandler(packetFilter)); + this.packetFilter = packetFilter; } - @Override protected void initChannel(SocketChannel channel) { channel.pipeline().addLast(new IdleStateHandler(0, 0, 180)); @@ -60,6 +54,6 @@ public class WebsocketGatewayServer extends AbstractServer { channel.pipeline().addLast(new WebSocketServerProtocolHandler("/websocket")); channel.pipeline().addLast(new ChunkedWriteHandler()); channel.pipeline().addLast(new WebSocketCodecHandler()); - channel.pipeline().addLast(gatewayRouteHandler); + channel.pipeline().addLast(new GatewayRouteHandler(packetFilter)); } } diff --git a/net/src/main/java/com/zfoo/net/core/gateway/WebsocketSslGatewayServer.java b/net/src/main/java/com/zfoo/net/core/gateway/WebsocketSslGatewayServer.java index 4730ccce..92c71dea 100644 --- a/net/src/main/java/com/zfoo/net/core/gateway/WebsocketSslGatewayServer.java +++ b/net/src/main/java/com/zfoo/net/core/gateway/WebsocketSslGatewayServer.java @@ -36,7 +36,6 @@ import org.springframework.lang.Nullable; import javax.net.ssl.SSLException; import java.io.InputStream; -import java.util.Objects; import java.util.function.BiFunction; /** @@ -49,20 +48,16 @@ public class WebsocketSslGatewayServer extends AbstractServer { private SslContext sslContext; - private final GatewayRouteHandler gatewayRouteHandler; + private BiFunction packetFilter; public WebsocketSslGatewayServer(HostAndPort host, InputStream pem, InputStream key, @Nullable BiFunction packetFilter) { - this(host, pem, key, packetFilter, null); - } - - public WebsocketSslGatewayServer(HostAndPort host, InputStream pem, InputStream key, @Nullable BiFunction packetFilter, @Nullable GatewayRouteHandler gatewayRouteHandler) { super(host); try { this.sslContext = SslContextBuilder.forServer(pem, key).build(); } catch (SSLException e) { logger.error(ExceptionUtils.getMessage(e)); } - this.gatewayRouteHandler = Objects.requireNonNullElse(gatewayRouteHandler, new GatewayRouteHandler(packetFilter)); + this.packetFilter = packetFilter; } @Override @@ -76,6 +71,6 @@ public class WebsocketSslGatewayServer extends AbstractServer { channel.pipeline().addLast(new WebSocketServerProtocolHandler("/")); channel.pipeline().addLast(new ChunkedWriteHandler()); channel.pipeline().addLast(new WebSocketCodecHandler()); - channel.pipeline().addLast(gatewayRouteHandler); + channel.pipeline().addLast(new GatewayRouteHandler(packetFilter)); } } diff --git a/net/src/main/java/com/zfoo/net/core/http/HttpServer.java b/net/src/main/java/com/zfoo/net/core/http/HttpServer.java index 693ecf45..8df15faa 100644 --- a/net/src/main/java/com/zfoo/net/core/http/HttpServer.java +++ b/net/src/main/java/com/zfoo/net/core/http/HttpServer.java @@ -24,9 +24,7 @@ import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.stream.ChunkedWriteHandler; -import org.springframework.lang.Nullable; -import java.util.Objects; import java.util.function.Function; /** @@ -38,19 +36,11 @@ public class HttpServer extends AbstractServer { /** * http的地址解析器 */ - private final Function uriResolver; - - private final ServerRouteHandler serverRouteHandler; + private Function uriResolver; public HttpServer(HostAndPort host, Function uriResolver) { - this(host, uriResolver, null); - } - - - public HttpServer(HostAndPort host, Function uriResolver, @Nullable ServerRouteHandler serverRouteHandler) { super(host); this.uriResolver = uriResolver; - this.serverRouteHandler = Objects.requireNonNullElse(serverRouteHandler, new ServerRouteHandler()); } @Override @@ -59,6 +49,6 @@ public class HttpServer extends AbstractServer { channel.pipeline().addLast(new HttpObjectAggregator(16 * IOUtils.BYTES_PER_MB)); channel.pipeline().addLast(new ChunkedWriteHandler()); channel.pipeline().addLast(new HttpCodecHandler(uriResolver)); - channel.pipeline().addLast(serverRouteHandler); + channel.pipeline().addLast(new ServerRouteHandler()); } } diff --git a/net/src/main/java/com/zfoo/net/core/jprotobuf/JProtobufGatewayServer.java b/net/src/main/java/com/zfoo/net/core/jprotobuf/JProtobufGatewayServer.java index 8a93a58c..e28e80f7 100644 --- a/net/src/main/java/com/zfoo/net/core/jprotobuf/JProtobufGatewayServer.java +++ b/net/src/main/java/com/zfoo/net/core/jprotobuf/JProtobufGatewayServer.java @@ -24,7 +24,6 @@ import io.netty.channel.socket.SocketChannel; import io.netty.handler.timeout.IdleStateHandler; import org.springframework.lang.Nullable; -import java.util.Objects; import java.util.function.BiFunction; /** @@ -33,16 +32,11 @@ import java.util.function.BiFunction; */ public class JProtobufGatewayServer extends AbstractServer { - private final GatewayRouteHandler gatewayRouteHandler; + private final BiFunction packetFilter; public JProtobufGatewayServer(HostAndPort host, @Nullable BiFunction packetFilter) { - this(host, packetFilter, null); - } - - - public JProtobufGatewayServer(HostAndPort host, @Nullable BiFunction packetFilter, @Nullable GatewayRouteHandler gatewayRouteHandler) { super(host); - this.gatewayRouteHandler = Objects.requireNonNullElse(gatewayRouteHandler, new GatewayRouteHandler(packetFilter)); + this.packetFilter = packetFilter; } @Override @@ -50,7 +44,7 @@ public class JProtobufGatewayServer extends AbstractServer { channel.pipeline().addLast(new IdleStateHandler(0, 0, 180)); channel.pipeline().addLast(new ServerIdleHandler()); channel.pipeline().addLast(new JProtobufTcpCodecHandler()); - channel.pipeline().addLast(gatewayRouteHandler); + channel.pipeline().addLast(new GatewayRouteHandler(packetFilter)); } } diff --git a/net/src/main/java/com/zfoo/net/core/jprotobuf/JProtobufTcpClient.java b/net/src/main/java/com/zfoo/net/core/jprotobuf/JProtobufTcpClient.java index 52decbcc..98fc2ced 100644 --- a/net/src/main/java/com/zfoo/net/core/jprotobuf/JProtobufTcpClient.java +++ b/net/src/main/java/com/zfoo/net/core/jprotobuf/JProtobufTcpClient.java @@ -20,9 +20,6 @@ import com.zfoo.net.handler.idle.ClientIdleHandler; import com.zfoo.util.net.HostAndPort; import io.netty.channel.socket.SocketChannel; import io.netty.handler.timeout.IdleStateHandler; -import org.springframework.lang.Nullable; - -import java.util.Objects; /** * @author godotg @@ -30,15 +27,8 @@ import java.util.Objects; */ public class JProtobufTcpClient extends AbstractClient { - private final ClientRouteHandler clientRouteHandler; - public JProtobufTcpClient(HostAndPort host) { - this(host, null); - } - - public JProtobufTcpClient(HostAndPort host, @Nullable ClientRouteHandler clientRouteHandler) { super(host); - this.clientRouteHandler = Objects.requireNonNullElse(clientRouteHandler, new ClientRouteHandler()); } @Override @@ -46,6 +36,6 @@ public class JProtobufTcpClient extends AbstractClient { channel.pipeline().addLast(new IdleStateHandler(0, 0, 60)); channel.pipeline().addLast(new ClientIdleHandler()); channel.pipeline().addLast(new JProtobufTcpCodecHandler()); - channel.pipeline().addLast(clientRouteHandler); + channel.pipeline().addLast(new ClientRouteHandler()); } } diff --git a/net/src/main/java/com/zfoo/net/core/jprotobuf/JProtobufTcpServer.java b/net/src/main/java/com/zfoo/net/core/jprotobuf/JProtobufTcpServer.java index 27ff4f94..697a498c 100644 --- a/net/src/main/java/com/zfoo/net/core/jprotobuf/JProtobufTcpServer.java +++ b/net/src/main/java/com/zfoo/net/core/jprotobuf/JProtobufTcpServer.java @@ -20,9 +20,6 @@ import com.zfoo.net.handler.idle.ServerIdleHandler; import com.zfoo.util.net.HostAndPort; import io.netty.channel.socket.SocketChannel; import io.netty.handler.timeout.IdleStateHandler; -import org.springframework.lang.Nullable; - -import java.util.Objects; /** * @author godotg @@ -30,15 +27,8 @@ import java.util.Objects; */ public class JProtobufTcpServer extends AbstractServer { - private final ServerRouteHandler serverRouteHandler; - public JProtobufTcpServer(HostAndPort host) { - this(host, null); - } - - public JProtobufTcpServer(HostAndPort host, @Nullable ServerRouteHandler serverRouteHandler) { super(host); - this.serverRouteHandler = Objects.requireNonNullElse(serverRouteHandler, new ServerRouteHandler()); } @Override @@ -46,6 +36,6 @@ public class JProtobufTcpServer extends AbstractServer { channel.pipeline().addLast(new IdleStateHandler(0, 0, 180)); channel.pipeline().addLast(new ServerIdleHandler()); channel.pipeline().addLast(new JProtobufTcpCodecHandler()); - channel.pipeline().addLast(serverRouteHandler); + channel.pipeline().addLast(new ServerRouteHandler()); } } diff --git a/net/src/main/java/com/zfoo/net/core/json/JsonWebsocketClient.java b/net/src/main/java/com/zfoo/net/core/json/JsonWebsocketClient.java index 0d5b43c3..41dab18a 100644 --- a/net/src/main/java/com/zfoo/net/core/json/JsonWebsocketClient.java +++ b/net/src/main/java/com/zfoo/net/core/json/JsonWebsocketClient.java @@ -24,9 +24,6 @@ import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolConfig; import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; -import org.springframework.lang.Nullable; - -import java.util.Objects; /** * @author godotg @@ -35,19 +32,12 @@ import java.util.Objects; public class JsonWebsocketClient extends AbstractClient { private WebSocketClientProtocolConfig webSocketClientProtocolConfig; - private final ClientRouteHandler clientRouteHandler; public JsonWebsocketClient(HostAndPort host, WebSocketClientProtocolConfig webSocketClientProtocolConfig) { - this(host, webSocketClientProtocolConfig, null); - } - - public JsonWebsocketClient(HostAndPort host, WebSocketClientProtocolConfig webSocketClientProtocolConfig, @Nullable ClientRouteHandler clientRouteHandler) { super(host); this.webSocketClientProtocolConfig = webSocketClientProtocolConfig; - this.clientRouteHandler = Objects.requireNonNullElse(clientRouteHandler, new ClientRouteHandler()); } - @Override public void initChannel(SocketChannel channel) { channel.pipeline().addLast(new HttpClientCodec(8 * IOUtils.BYTES_PER_KB, 16 * IOUtils.BYTES_PER_KB, 16 * IOUtils.BYTES_PER_KB)); @@ -55,7 +45,7 @@ public class JsonWebsocketClient extends AbstractClient { channel.pipeline().addLast(new WebSocketClientProtocolHandler(webSocketClientProtocolConfig)); channel.pipeline().addLast(new ChunkedWriteHandler()); channel.pipeline().addLast(new JsonWebSocketCodecHandler()); - channel.pipeline().addLast(clientRouteHandler); + channel.pipeline().addLast(new ClientRouteHandler()); } } diff --git a/net/src/main/java/com/zfoo/net/core/json/JsonWebsocketServer.java b/net/src/main/java/com/zfoo/net/core/json/JsonWebsocketServer.java index d723149f..abdaf309 100644 --- a/net/src/main/java/com/zfoo/net/core/json/JsonWebsocketServer.java +++ b/net/src/main/java/com/zfoo/net/core/json/JsonWebsocketServer.java @@ -23,9 +23,6 @@ import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; -import org.springframework.lang.Nullable; - -import java.util.Objects; /** * @author godotg @@ -33,15 +30,8 @@ import java.util.Objects; */ public class JsonWebsocketServer extends AbstractServer { - private final ServerRouteHandler serverRouteHandler; - public JsonWebsocketServer(HostAndPort host) { - this(host, null); - } - - public JsonWebsocketServer(HostAndPort host, @Nullable ServerRouteHandler serverRouteHandler) { super(host); - this.serverRouteHandler = Objects.requireNonNullElse(serverRouteHandler, new ServerRouteHandler()); } @Override @@ -57,6 +47,6 @@ public class JsonWebsocketServer extends AbstractServer { channel.pipeline().addLast(new ChunkedWriteHandler()); // 编解码WebSocketFrame二进制协议 channel.pipeline().addLast(new JsonWebSocketCodecHandler()); - channel.pipeline().addLast(serverRouteHandler); + channel.pipeline().addLast(new ServerRouteHandler()); } } diff --git a/net/src/main/java/com/zfoo/net/core/tcp/TcpClient.java b/net/src/main/java/com/zfoo/net/core/tcp/TcpClient.java index fbccbf97..37dca9b5 100644 --- a/net/src/main/java/com/zfoo/net/core/tcp/TcpClient.java +++ b/net/src/main/java/com/zfoo/net/core/tcp/TcpClient.java @@ -20,25 +20,14 @@ import com.zfoo.net.handler.idle.ClientIdleHandler; import com.zfoo.util.net.HostAndPort; import io.netty.channel.socket.SocketChannel; import io.netty.handler.timeout.IdleStateHandler; -import org.springframework.lang.Nullable; - -import java.util.Objects; /** * @author godotg * @version 3.0 */ public class TcpClient extends AbstractClient { - - private final ClientRouteHandler clientRouteHandler; - public TcpClient(HostAndPort host) { - this(host, null); - } - - public TcpClient(HostAndPort host, @Nullable ClientRouteHandler clientRouteHandler) { super(host); - this.clientRouteHandler = Objects.requireNonNullElse(clientRouteHandler, new ClientRouteHandler()); } @Override @@ -48,7 +37,6 @@ public class TcpClient extends AbstractClient { channel.pipeline().addLast(new IdleStateHandler(0, 0, 60)); channel.pipeline().addLast(new ClientIdleHandler()); channel.pipeline().addLast(new TcpCodecHandler()); - channel.pipeline().addLast(clientRouteHandler); + channel.pipeline().addLast(new ClientRouteHandler()); } - } diff --git a/net/src/main/java/com/zfoo/net/core/tcp/TcpServer.java b/net/src/main/java/com/zfoo/net/core/tcp/TcpServer.java index adc1d2c1..5e9ef313 100644 --- a/net/src/main/java/com/zfoo/net/core/tcp/TcpServer.java +++ b/net/src/main/java/com/zfoo/net/core/tcp/TcpServer.java @@ -20,9 +20,6 @@ import com.zfoo.net.handler.idle.ServerIdleHandler; import com.zfoo.util.net.HostAndPort; import io.netty.channel.socket.SocketChannel; import io.netty.handler.timeout.IdleStateHandler; -import org.springframework.lang.Nullable; - -import java.util.Objects; /** * @author godotg @@ -30,15 +27,8 @@ import java.util.Objects; */ public class TcpServer extends AbstractServer { - private final ServerRouteHandler routeHandler; - public TcpServer(HostAndPort host) { - this(host, null); - } - - public TcpServer(HostAndPort host, @Nullable ServerRouteHandler serverRouteHandler) { super(host); - this.routeHandler = Objects.requireNonNullElse(serverRouteHandler, new ServerRouteHandler()); } @Override @@ -46,6 +36,6 @@ public class TcpServer extends AbstractServer { channel.pipeline().addLast(new IdleStateHandler(0, 0, 180)); channel.pipeline().addLast(new ServerIdleHandler()); channel.pipeline().addLast(new TcpCodecHandler()); - channel.pipeline().addLast(routeHandler); + channel.pipeline().addLast(new ServerRouteHandler()); } } diff --git a/net/src/main/java/com/zfoo/net/core/udp/UdpClient.java b/net/src/main/java/com/zfoo/net/core/udp/UdpClient.java index 7a42f6c8..c81fd654 100644 --- a/net/src/main/java/com/zfoo/net/core/udp/UdpClient.java +++ b/net/src/main/java/com/zfoo/net/core/udp/UdpClient.java @@ -27,9 +27,6 @@ import io.netty.channel.ChannelOption; import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollDatagramChannel; import io.netty.channel.socket.nio.NioDatagramChannel; -import org.springframework.lang.Nullable; - -import java.util.Objects; /** * @author godotg @@ -37,15 +34,8 @@ import java.util.Objects; */ public class UdpClient extends AbstractClient { - private final ClientRouteHandler clientRouteHandler; - public UdpClient(HostAndPort host) { - this(host, null); - } - - public UdpClient(HostAndPort host, @Nullable ClientRouteHandler clientRouteHandler) { super(host); - this.clientRouteHandler = Objects.requireNonNullElse(clientRouteHandler, new ClientRouteHandler()); } @Override @@ -83,6 +73,6 @@ public class UdpClient extends AbstractClient { @Override protected void initChannel(Channel channel) { channel.pipeline().addLast(new UdpCodecHandler()); - channel.pipeline().addLast(clientRouteHandler); + channel.pipeline().addLast(new ClientRouteHandler()); } } diff --git a/net/src/main/java/com/zfoo/net/core/udp/UdpServer.java b/net/src/main/java/com/zfoo/net/core/udp/UdpServer.java index a5b19f6b..3ae45ad1 100644 --- a/net/src/main/java/com/zfoo/net/core/udp/UdpServer.java +++ b/net/src/main/java/com/zfoo/net/core/udp/UdpServer.java @@ -27,27 +27,16 @@ import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.util.concurrent.DefaultThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.lang.Nullable; - -import java.util.Objects; /** * @author godotg * @version 3.0 */ public class UdpServer extends AbstractServer { - private static final Logger logger = LoggerFactory.getLogger(UdpServer.class); - private final ServerRouteHandler serverRouteHandler; - public UdpServer(HostAndPort host) { - this(host, null); - } - - public UdpServer(HostAndPort host, @Nullable ServerRouteHandler serverRouteHandler) { super(host); - this.serverRouteHandler = Objects.requireNonNullElse(serverRouteHandler, new ServerRouteHandler()); } @Override @@ -78,6 +67,6 @@ public class UdpServer extends AbstractServer { @Override protected void initChannel(Channel channel) { channel.pipeline().addLast(new UdpCodecHandler()); - channel.pipeline().addLast(serverRouteHandler); + channel.pipeline().addLast(new ServerRouteHandler()); } } diff --git a/net/src/main/java/com/zfoo/net/core/websocket/WebsocketClient.java b/net/src/main/java/com/zfoo/net/core/websocket/WebsocketClient.java index 4c38ea90..076d40e1 100644 --- a/net/src/main/java/com/zfoo/net/core/websocket/WebsocketClient.java +++ b/net/src/main/java/com/zfoo/net/core/websocket/WebsocketClient.java @@ -24,9 +24,6 @@ import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolConfig; import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; -import org.springframework.lang.Nullable; - -import java.util.Objects; /** @@ -37,16 +34,9 @@ public class WebsocketClient extends AbstractClient { private final WebSocketClientProtocolConfig webSocketClientProtocolConfig; - private final ClientRouteHandler clientRouteHandler; - public WebsocketClient(HostAndPort host, WebSocketClientProtocolConfig webSocketClientProtocolConfig) { - this(host, webSocketClientProtocolConfig, null); - } - - public WebsocketClient(HostAndPort host, WebSocketClientProtocolConfig webSocketClientProtocolConfig, @Nullable ClientRouteHandler clientRouteHandler) { super(host); this.webSocketClientProtocolConfig = webSocketClientProtocolConfig; - this.clientRouteHandler = Objects.requireNonNullElse(clientRouteHandler, new ClientRouteHandler()); } @Override @@ -56,6 +46,6 @@ public class WebsocketClient extends AbstractClient { channel.pipeline().addLast(new WebSocketClientProtocolHandler(webSocketClientProtocolConfig)); channel.pipeline().addLast(new ChunkedWriteHandler()); channel.pipeline().addLast(new WebSocketCodecHandler()); - channel.pipeline().addLast(clientRouteHandler); + channel.pipeline().addLast(new ClientRouteHandler()); } } diff --git a/net/src/main/java/com/zfoo/net/core/websocket/WebsocketServer.java b/net/src/main/java/com/zfoo/net/core/websocket/WebsocketServer.java index 3d061d07..cf5ee231 100644 --- a/net/src/main/java/com/zfoo/net/core/websocket/WebsocketServer.java +++ b/net/src/main/java/com/zfoo/net/core/websocket/WebsocketServer.java @@ -23,9 +23,6 @@ import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; -import org.springframework.lang.Nullable; - -import java.util.Objects; /** * @author godotg @@ -33,16 +30,8 @@ import java.util.Objects; */ public class WebsocketServer extends AbstractServer { - private final ServerRouteHandler serverRouteHandler; - public WebsocketServer(HostAndPort host) { - this(host, null); - } - - public WebsocketServer(HostAndPort host, @Nullable ServerRouteHandler serverRouteHandler) { super(host); - this.serverRouteHandler = Objects.requireNonNullElse(serverRouteHandler, new ServerRouteHandler()); - } @Override @@ -58,7 +47,7 @@ public class WebsocketServer extends AbstractServer { channel.pipeline().addLast(new ChunkedWriteHandler()); // 编解码WebSocketFrame二进制协议 channel.pipeline().addLast(new WebSocketCodecHandler()); - channel.pipeline().addLast(serverRouteHandler); + channel.pipeline().addLast(new ServerRouteHandler()); } } diff --git a/net/src/main/java/com/zfoo/net/handler/BaseRouteHandler.java b/net/src/main/java/com/zfoo/net/handler/BaseRouteHandler.java index 622cd369..147662ca 100644 --- a/net/src/main/java/com/zfoo/net/handler/BaseRouteHandler.java +++ b/net/src/main/java/com/zfoo/net/handler/BaseRouteHandler.java @@ -26,8 +26,6 @@ import io.netty.util.AttributeKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.function.Consumer; - /** * @author godotg * @version 3.0 @@ -39,9 +37,6 @@ public abstract class BaseRouteHandler extends ChannelInboundHandlerAdapter { public static final AttributeKey SESSION_KEY = AttributeKey.valueOf("session"); - public Consumer sessionActiveConsumer; - public Consumer sessionInactiveConsumer; - public static Session initChannel(Channel channel) { var sessionAttr = channel.attr(SESSION_KEY); @@ -54,17 +49,6 @@ public abstract class BaseRouteHandler extends ChannelInboundHandlerAdapter { return session; } - protected void onSessionActive(Session session) { - if (sessionActiveConsumer != null) { - sessionActiveConsumer.accept(session); - } - } - - protected void onSessionInactive(Session session) { - if (sessionInactiveConsumer != null) { - sessionInactiveConsumer.accept(session); - } - } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { diff --git a/net/src/main/java/com/zfoo/net/handler/ClientRouteHandler.java b/net/src/main/java/com/zfoo/net/handler/ClientRouteHandler.java index e46225bb..22fb7d3f 100644 --- a/net/src/main/java/com/zfoo/net/handler/ClientRouteHandler.java +++ b/net/src/main/java/com/zfoo/net/handler/ClientRouteHandler.java @@ -37,7 +37,6 @@ public class ClientRouteHandler extends BaseRouteHandler { super.channelActive(ctx); // 客户端的session初始化在启动的时候已经做了,这边直接获取session var session = SessionUtils.getSession(ctx); - onSessionActive(session); EventBus.submit(ClientSessionActiveEvent.valueOf(session)); logger.info("client channel is active {}", SessionUtils.sessionInfo(ctx)); } @@ -53,7 +52,6 @@ public class ClientRouteHandler extends BaseRouteHandler { } NetContext.getSessionManager().removeClientSession(session); - onSessionInactive(session); EventBus.submit(ClientSessionInactiveEvent.valueOf(session)); // 如果是消费者inactive,还需要触发客户端消费者检查事件,以便重新连接 diff --git a/net/src/main/java/com/zfoo/net/handler/GatewayRouteHandler.java b/net/src/main/java/com/zfoo/net/handler/GatewayRouteHandler.java index 870cdd09..b4a38afd 100644 --- a/net/src/main/java/com/zfoo/net/handler/GatewayRouteHandler.java +++ b/net/src/main/java/com/zfoo/net/handler/GatewayRouteHandler.java @@ -35,7 +35,9 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.lang.Nullable; +import java.util.Objects; import java.util.function.BiFunction; /** @@ -47,10 +49,12 @@ public class GatewayRouteHandler extends ServerRouteHandler { private static final Logger logger = LoggerFactory.getLogger(GatewayRouteHandler.class); + public static final BiFunction DEFAULT_PACKER_FILTER = (session, packet) -> Boolean.FALSE; + private final BiFunction packetFilter; - public GatewayRouteHandler(BiFunction packetFilter) { - this.packetFilter = packetFilter; + public GatewayRouteHandler(@Nullable BiFunction packetFilter) { + this.packetFilter = Objects.requireNonNullElse(packetFilter, DEFAULT_PACKER_FILTER); } @@ -73,9 +77,8 @@ public class GatewayRouteHandler extends ServerRouteHandler { } // 过滤非法包 - if (packetFilter != null && packetFilter.apply(session, packet)) { - throw new IllegalArgumentException(StringUtils.format(" session:{}发送了一个非法包[{}]" - , SessionUtils.sessionSimpleInfo(ctx), JsonUtils.object2String(packet))); + if (packetFilter.apply(session, packet)) { + throw new IllegalArgumentException(StringUtils.format(" session:{}发送了一个非法包[{}]", SessionUtils.sessionSimpleInfo(ctx), JsonUtils.object2String(packet))); } var signalAttachment = (SignalAttachment) decodedPacketInfo.getAttachment(); diff --git a/net/src/main/java/com/zfoo/net/handler/ServerRouteHandler.java b/net/src/main/java/com/zfoo/net/handler/ServerRouteHandler.java index 5cebb5e1..623ca910 100644 --- a/net/src/main/java/com/zfoo/net/handler/ServerRouteHandler.java +++ b/net/src/main/java/com/zfoo/net/handler/ServerRouteHandler.java @@ -38,7 +38,6 @@ public class ServerRouteHandler extends BaseRouteHandler { var session = initChannel(ctx.channel()); NetContext.getSessionManager().addServerSession(session); logger.info("server channel is active {}", SessionUtils.sessionInfo(ctx)); - onSessionActive(session); EventBus.submit(ServerSessionActiveEvent.valueOf(session)); } @@ -52,7 +51,6 @@ public class ServerRouteHandler extends BaseRouteHandler { } NetContext.getSessionManager().removeServerSession(session); logger.warn("server channel is inactive {}", SessionUtils.sessionSimpleInfo(ctx)); - onSessionInactive(session); EventBus.submit(ServerSessionInactiveEvent.valueOf(session)); } }