diff --git a/doc/image/netty/aio-netty-0.png b/doc/image/netty/aio-netty-0.png new file mode 100644 index 00000000..8797fe45 Binary files /dev/null and b/doc/image/netty/aio-netty-0.png differ diff --git a/doc/image/netty/aio-netty-1.png b/doc/image/netty/aio-netty-1.png new file mode 100644 index 00000000..ee2f7ead Binary files /dev/null and b/doc/image/netty/aio-netty-1.png differ diff --git a/doc/image/netty/aio-netty-2.png b/doc/image/netty/aio-netty-2.png new file mode 100644 index 00000000..0216bbc3 Binary files /dev/null and b/doc/image/netty/aio-netty-2.png differ diff --git a/doc/image/netty/aio-netty-3.png b/doc/image/netty/aio-netty-3.png new file mode 100644 index 00000000..beffb0e9 Binary files /dev/null and b/doc/image/netty/aio-netty-3.png differ diff --git a/doc/image/netty/aio-netty-4.png b/doc/image/netty/aio-netty-4.png new file mode 100644 index 00000000..37daaea7 Binary files /dev/null and b/doc/image/netty/aio-netty-4.png differ diff --git a/doc/image/netty/aio-netty-5.png b/doc/image/netty/aio-netty-5.png new file mode 100644 index 00000000..4635353d Binary files /dev/null and b/doc/image/netty/aio-netty-5.png differ diff --git a/doc/image/netty/aio-nodejs-0.png b/doc/image/netty/aio-nodejs-0.png new file mode 100644 index 00000000..9bc45a8a Binary files /dev/null and b/doc/image/netty/aio-nodejs-0.png differ diff --git a/doc/image/netty/aio-nodejs-1.png b/doc/image/netty/aio-nodejs-1.png new file mode 100644 index 00000000..9e32d8c4 Binary files /dev/null and b/doc/image/netty/aio-nodejs-1.png differ diff --git a/doc/image/netty/aio-nodejs-2.png b/doc/image/netty/aio-nodejs-2.png new file mode 100644 index 00000000..79e63f64 Binary files /dev/null and b/doc/image/netty/aio-nodejs-2.png differ diff --git a/doc/image/netty/aio-nodejs-3.png b/doc/image/netty/aio-nodejs-3.png new file mode 100644 index 00000000..8bec42f4 Binary files /dev/null and b/doc/image/netty/aio-nodejs-3.png differ diff --git a/doc/netty/netty-nio.md b/doc/netty/netty-nio.md new file mode 100644 index 00000000..33bb24d3 --- /dev/null +++ b/doc/netty/netty-nio.md @@ -0,0 +1,12 @@ +阻塞和非阻塞是针对内核和上层应用来说的;同步和异步是针对应用和开发者来说的来说的。 +![Image text](../image/netty/aio-nodejs-0.png) +![Image text](../image/netty/aio-nodejs-1.png) +![Image text](../image/netty/aio-nodejs-2.png) +![Image text](../image/netty/aio-nodejs-3.png) + +![Image text](../image/netty/aio-netty-0.png) +![Image text](../image/netty/aio-netty-1.png) +![Image text](../image/netty/aio-netty-2.png) +![Image text](../image/netty/aio-netty-3.png) +![Image text](../image/netty/aio-netty-4.png) +![Image text](../image/netty/aio-netty-5.png) diff --git a/doc/netty/netty.md b/doc/netty/netty.md new file mode 100644 index 00000000..ad070ae2 --- /dev/null +++ b/doc/netty/netty.md @@ -0,0 +1,98 @@ +# 一、netty模型 + +## 1.reactor线程驱动模型 + +``` +bossGroup 线程池则只是在 Bind 某个端口后,获得其中一个线程作为 MainReactor, + 专门处理端口的 Accept 事件,每个端口对应一个 Boss 线程。 +workerGroup 线程池会被各个 SubReactor 和 Worker 线程充分利用。 + +``` + +## 2.Netty事件模型 + +``` +核心思路基于主从Reactor模型,一个NioEventLoopGroup包含n个NioEventLoop, +每一个NioEventLoop持有一个Selector和一个线程池(执行器EventExecute,其实是netty ForkJoinPool,并发度为n), +在选择器的NioEventLoop的run方法每次运行后,就会交给NioEventLoop中线程池的另外一个线程。 +``` + +## 3.I/O复用模型 + +- Netty 的非阻塞 I/O 的实现关键是基于 I/O 复用模型,这里用 Selector 对象表示,基于Java的NIO,netty会有多个Selector + +``` +DK的NIO类库有一个epoll死循环bug,它会导致Selector空轮询,IO线程CPU达到100%,严重影响系统运行。 +Netty的解决策略: +1.对Selector的select操作周期进行统计。 +2.每完成一次空的select操作进行一次计数。 +3.在某个周期内如果连续N次空轮询,则说明触发了JDK NIO的epoll死循环bug。 +4.创建新的Selector,将出现bug的Selector上的channel重新注册到新的Selector上。 +5.关闭bug的Selector,使用新的Selector进行替换。 +``` + +# 二、netty设置 + +## 1.netty有哪些参数设置 + + 1.TCP_NODELAY + 解释:是否启用Nagle算法,改算法将小的碎片数据连接成更大的报文来最小化所发送的报文的数量。 + 使用建议:如果需要发送一些较小的报文,则需要禁用该算法,从而最小化报文传输延时。 + 只有在网络通信非常大时(通常指已经到100k+/秒了),设置为false会有些许优势,因此建议大部分情况下均应设置为true。 + + 2.SO_REUSEADDR + 解释:是否复用处于TIME_WAIT状态连接的端口,适用于有大量处于TIME_WAIT状态连接的场景,如高并发量的Http短连接场景等。 + + 3.SO_SNDBUF +   解释:Socket参数,TCP数据发送缓冲区大小,即TCP发送滑动窗口,linux操作系统可使用命令:cat /proc/sys/net/ipv4/tcp_smem查询其大小。 + 缓冲区的大小决定了网络通信的吞吐量(网络吞吐量=缓冲区大小/网络时延)。 +   使用建议:缓冲区大小设为网络吞吐量达到带宽上限时的值,即缓冲区大小=网络带宽*网络时延。以千兆网卡为例进行计算,假设网络时延为1ms,缓冲区大小=1000Mb/s * 1ms = 128KB。 + + 4.SO_KEEPALIVE +   解释:是否使用TCP的心跳机制; +   使用建议:心跳机制由应用层自己实现; + +## 2.netty最佳的Boss线程连接数 + +``` +没有固定的连接数,一切还是看你的场景,连接数在满足传输吞吐量的情况下,越少越好。 + +2条连接时,只能有40k QPS。 + +48条连接,升到62k QPS,CPU烧了28% + +4条连接,QPS反而上升到68k ,而且CPU降到20%。 +``` + +## 3.netty线程池怎么设置 + +``` +Boss Group用于服务端处理建立连接的请求,WorkGroup用于处理I/O。 +EventLoopGroup的默认大小都是是2倍的CPU核数,但这并不是一个恒定的最佳数量,为了避免线程上下文切换,只要能满足要求,这个值其实越少越好。 +如果都是长连接,Boss Group平时很闲,好在它也只有忙起来才会多起线程,平时就只占1条。 + + +Netty线程的数量一般固定且较少,所以很怕线程被堵塞,比如同步的数据库查询, +比如下游的服务调用(又来罗嗦,future.get()式的异步在执行future.get()时还是堵住当前线程的啊)。 +所以,此时就要把处理放到一个业务线程池里操作,即使要付出线程上下文切换的代价 +``` + +## 4.netty的JVM参数设置 + +``` +-Dio.netty.leakDetectionLevel=disabled把检测关掉 +堆外内存大小 +``` + +## 5.netty注意事项 + +``` +1. ctx.writeAndFlush() 与 channel.writeAndFlush()的区别在于,channel要经过整条Pipeline,而ctx直接找下一个outboundHandler。 + +2. channel.writeAndFlush(buf, channel.voidPromise() ) +writeAndFlush不管你用不用默认构造返回一个Promise(Future),有点浪费内存。没有用的话,用一个公共的 voidPromise ,减少大家花费。不过低版本的Netty不能用。 + +3. 空闲连接管理,因为刚才说的ctx.writeAndFlush()不经Pipeline,所以只监控读空闲就够了。否则每次请求都要READ/WRITE/ALL IDEL三个值算一遍,白白消耗。 + +4. Handler能共用就标上Shareable Annotation然后共用,不要每个Channel建一个。 +``` diff --git a/net/src/test/java/com/zfoo/net/base/aio/client/AioClientTest.java b/net/src/test/java/com/zfoo/net/base/aio/client/AioClientTest.java new file mode 100644 index 00000000..4bde35f4 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/aio/client/AioClientTest.java @@ -0,0 +1,57 @@ +package com.zfoo.net.base.aio.client; + + +import com.zfoo.util.ThreadUtils; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.AsynchronousSocketChannel; +import java.util.concurrent.CountDownLatch; + +public class AioClientTest implements Runnable { + + private AsynchronousSocketChannel client; + private String host; + private int port; + private CountDownLatch latch; + + public AioClientTest(String host, int port, int latchNum) { + this.host = host; + this.port = port; + this.latch = new CountDownLatch(latchNum); + } + + public void init() { + try { + client = AsynchronousSocketChannel.open(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void run() { + ConnectCompletionHandler handler = new ConnectCompletionHandler(client, latch); + client.connect(new InetSocketAddress(host, port), handler, handler); + try { + latch.await(); + client.close(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Ignore + @Test + public void clientTest() { + AioClientTest aioClient = new AioClientTest("127.0.0.1", 9999, 1); + aioClient.init(); + new Thread(aioClient, "client").start(); + ThreadUtils.sleep(Long.MAX_VALUE); + } + +} diff --git a/net/src/test/java/com/zfoo/net/base/aio/client/ConnectCompletionHandler.java b/net/src/test/java/com/zfoo/net/base/aio/client/ConnectCompletionHandler.java new file mode 100644 index 00000000..83b5d3a3 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/aio/client/ConnectCompletionHandler.java @@ -0,0 +1,55 @@ +package com.zfoo.net.base.aio.client; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.CompletionHandler; +import java.util.concurrent.CountDownLatch; + +public class ConnectCompletionHandler implements CompletionHandler { + + private AsynchronousSocketChannel client; + private CountDownLatch latch; + + public ConnectCompletionHandler(AsynchronousSocketChannel channel, CountDownLatch latch) { + this.client = channel; + this.latch = latch; + } + + @Override + public void completed(Void result, ConnectCompletionHandler attachment) { + byte[] req = "QUERY TIME ORDER!".getBytes(); + ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); + writeBuffer.put(req); + writeBuffer.flip(); + client.write(writeBuffer, writeBuffer, new CompletionHandler() { + @Override + public void completed(Integer result, ByteBuffer buffer) { + if (buffer.hasRemaining()) { + client.write(buffer, buffer, this); + } else { + ByteBuffer readBuffer = ByteBuffer.allocate(1024); + client.read(readBuffer, readBuffer, new ReadCompletionHandler(client, latch)); + } + } + + @Override + public void failed(Throwable exc, ByteBuffer attachment) { + try { + client.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + }); + } + + @Override + public void failed(Throwable exc, ConnectCompletionHandler attachment) { + try { + client.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } +} diff --git a/net/src/test/java/com/zfoo/net/base/aio/client/ReadCompletionHandler.java b/net/src/test/java/com/zfoo/net/base/aio/client/ReadCompletionHandler.java new file mode 100644 index 00000000..269e6b1b --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/aio/client/ReadCompletionHandler.java @@ -0,0 +1,46 @@ +package com.zfoo.net.base.aio.client; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.CompletionHandler; +import java.util.concurrent.CountDownLatch; + +/** + * Created by Administrator on 2017/5/19 0019. + */ +public class ReadCompletionHandler implements CompletionHandler { + + private AsynchronousSocketChannel client; + private CountDownLatch latch; + + public ReadCompletionHandler(AsynchronousSocketChannel channel, CountDownLatch latch) { + this.client = channel; + this.latch = latch; + } + + @Override + public void completed(Integer result, ByteBuffer attachment) { + attachment.flip(); + byte[] bytes = new byte[attachment.remaining()]; + attachment.get(bytes); + String body; + try { + body = new String(bytes, "UTF-8"); + System.out.println("Now is :" + body); + latch.countDown(); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + } + + @Override + public void failed(Throwable exc, ByteBuffer attachment) { + try { + client.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } +} diff --git a/net/src/test/java/com/zfoo/net/base/aio/server/AcceptCompletionHandler.java b/net/src/test/java/com/zfoo/net/base/aio/server/AcceptCompletionHandler.java new file mode 100644 index 00000000..4f5d5eac --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/aio/server/AcceptCompletionHandler.java @@ -0,0 +1,20 @@ +package com.zfoo.net.base.aio.server; + +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.CompletionHandler; + +public class AcceptCompletionHandler implements CompletionHandler { + @Override + public void completed(AsynchronousSocketChannel result, AioServerTest attachment) { + attachment.getAsynchronousServerSocketChannel().accept(attachment, this); + ByteBuffer buffer = ByteBuffer.allocate(1024); + result.read(buffer, buffer, new ReadCompletionHandler(result)); + } + + @Override + public void failed(Throwable exc, AioServerTest attachment) { + exc.printStackTrace(); + attachment.getLatch().countDown(); + } +} diff --git a/net/src/test/java/com/zfoo/net/base/aio/server/AioServerTest.java b/net/src/test/java/com/zfoo/net/base/aio/server/AioServerTest.java new file mode 100644 index 00000000..83c53261 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/aio/server/AioServerTest.java @@ -0,0 +1,72 @@ +package com.zfoo.net.base.aio.server; + + +import com.zfoo.util.ThreadUtils; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.AsynchronousServerSocketChannel; +import java.util.concurrent.CountDownLatch; + +public class AioServerTest implements Runnable { + + private int port; + + private CountDownLatch latch; + + private AsynchronousServerSocketChannel asynchronousServerSocketChannel; + + public AioServerTest(int port, int latchNum) { + this.port = port; + latch = new CountDownLatch(latchNum); + } + + public void init() { + try { + asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open(); + asynchronousServerSocketChannel.bind(new InetSocketAddress(port)); + System.out.println("The Time server is start in port:" + port); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void run() { + asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler()); + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + + public AsynchronousServerSocketChannel getAsynchronousServerSocketChannel() { + return asynchronousServerSocketChannel; + } + + public void setAsynchronousServerSocketChannel(AsynchronousServerSocketChannel asynchronousServerSocketChannel) { + this.asynchronousServerSocketChannel = asynchronousServerSocketChannel; + } + + public CountDownLatch getLatch() { + return latch; + } + + public void setLatch(CountDownLatch latch) { + this.latch = latch; + } + + @Ignore + @Test + public void serverTest() { + AioServerTest aioServer = new AioServerTest(9999, 1); + aioServer.init(); + new Thread(aioServer, "server").start(); + ThreadUtils.sleep(Long.MAX_VALUE); + } + +} diff --git a/net/src/test/java/com/zfoo/net/base/aio/server/ReadCompletionHandler.java b/net/src/test/java/com/zfoo/net/base/aio/server/ReadCompletionHandler.java new file mode 100644 index 00000000..620493db --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/aio/server/ReadCompletionHandler.java @@ -0,0 +1,74 @@ +package com.zfoo.net.base.aio.server; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.CompletionHandler; +import java.util.Date; + +public class ReadCompletionHandler implements CompletionHandler { + + private AsynchronousSocketChannel channel; + + public ReadCompletionHandler(AsynchronousSocketChannel channel) { + this.channel = channel; + } + + @Override + public void completed(Integer result, ByteBuffer attachment) { + + attachment.flip(); + byte[] body = new byte[attachment.remaining()]; + attachment.get(body); + String req = null; + + try { + req = new String(body, "UTF-8"); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + + System.out.println("The time server receive order:" + req); + + String currentTime = "Current time is " + new Date().toString() + "hello dddddddddd"; + + doWrite(currentTime); + } + + @Override + public void failed(Throwable exc, ByteBuffer attachment) { + try { + this.channel.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void doWrite(String currentTime) { + if (currentTime != null && currentTime.trim().length() > 0) { + byte[] bytes = currentTime.getBytes(); + ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); + writeBuffer.put(bytes); + writeBuffer.flip(); + channel.write(writeBuffer, writeBuffer, new CompletionHandler() { + @Override + public void completed(Integer result, ByteBuffer buffer) { + //如果没有发送完成,则继续发送 + if (writeBuffer.hasRemaining()) { + channel.write(buffer); + } + } + + @Override + public void failed(Throwable exc, ByteBuffer buffer) { + try { + channel.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + }); + } + } +} diff --git a/net/src/test/java/com/zfoo/net/base/bio/TcpTest.java b/net/src/test/java/com/zfoo/net/base/bio/TcpTest.java new file mode 100644 index 00000000..7ce497dd --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/bio/TcpTest.java @@ -0,0 +1,60 @@ +package com.zfoo.net.base.bio; + +import com.zfoo.protocol.util.IOUtils; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.*; +import java.net.ServerSocket; +import java.net.Socket; + + +@Ignore +public class TcpTest { + + @Test + public void serverTest() throws IOException { + // 1.穿件服务器,指定端口 + ServerSocket server = null; + BufferedWriter bufferedWriter = null; + try { + server = new ServerSocket(9999); + // 2.接受客户端连接,阻塞式 + Socket socket = server.accept(); + System.out.println("hello http!!!!!!"); + + // 3.发送数据+接受数据 + String message = "welocme to internet!!!"; + bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); + bufferedWriter.write(message); + bufferedWriter.newLine(); + bufferedWriter.flush(); + } finally { + IOUtils.closeIO(bufferedWriter, server); + } + } + + @Test + public void clientTest() throws IOException { + // 1.创建客户端,必须指定服务器+端口,此时就在连接 + Socket client = null; + BufferedReader bufferedReader = null; + + try { + // 同一个协议下TCP,端口不能重复,UDP和TCP可以公用一个端口 + // 端口两个字节,1024一下为系统级别的端口,不能使用 + client = new Socket("localhost", 9999); + + // 2.接受数据 + bufferedReader = new BufferedReader(new InputStreamReader(client.getInputStream())); + String receiveMes = bufferedReader.readLine(); + System.out.println(receiveMes); + + // 3.发送数据 + client.getOutputStream().write(new String("This is client!").getBytes()); + client.getOutputStream().flush(); + } finally { + IOUtils.closeIO(client.getOutputStream(), client); + } + } +} diff --git a/net/src/test/java/com/zfoo/net/base/bio/UdpTest.java b/net/src/test/java/com/zfoo/net/base/bio/UdpTest.java new file mode 100644 index 00000000..09faecd7 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/bio/UdpTest.java @@ -0,0 +1,60 @@ +package com.zfoo.net.base.bio; + +import com.zfoo.protocol.util.IOUtils; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetSocketAddress; + +@Ignore +public class UdpTest { + + @Test + public void serverTest() throws IOException { + // 1.创建服务端+端口 + DatagramSocket server = null; + try { + server = new DatagramSocket(8000); + // 2.准备接收器 + byte[] container = new byte[1024]; + // 3.封装成包DatagramPacket + DatagramPacket packet = new DatagramPacket(container, + container.length); + // 4.接收数据,接收数据可以阻塞 + server.receive(packet); + // 分析数据 + byte[] data = packet.getData(); + int length = packet.getLength(); + System.out.println(new String(data, 0, length)); + } finally { + // 5.释放资源 + server.close(); + } + + } + + @Test + public void clientTest() throws IOException { + // 1.创建客户端+端口 + DatagramSocket client = null; + try { + client = new DatagramSocket(6666); + // 2.准备数据 + String message = "udp 编程"; + byte[] data = message.getBytes(); + // 3.打包DatagramPacket,发送的地点及端口 + DatagramPacket packet = new DatagramPacket(data, data.length, + new InetSocketAddress("localhost", 8888)); + // 4.发送 + client.send(packet); + // 5.释放资源 + client.close(); + } finally { + IOUtils.closeIO(client); + } + } + +} diff --git a/net/src/test/java/com/zfoo/net/base/bio/UrlTest.java b/net/src/test/java/com/zfoo/net/base/bio/UrlTest.java new file mode 100644 index 00000000..8465a6ee --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/bio/UrlTest.java @@ -0,0 +1,90 @@ +package com.zfoo.net.base.bio; + +import com.zfoo.protocol.util.IOUtils; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URL; +import java.net.UnknownHostException; + + +@Ignore +public class UrlTest { + + // 不包含端口的ip地址 + @Test + public void inetAddressTest() throws UnknownHostException { + // 获取本机的ip + InetAddress ipLocal = InetAddress.getLocalHost(); + System.out.println("本机地址相关信息:"); + System.out.println("本机计算机名称:" + ipLocal.getHostName()); + System.out.println("本机ip:" + ipLocal.getHostAddress()); + + System.out.println("**************************************************************************"); + + // 获取远程ip地址 + InetAddress ipBaidu = InetAddress.getByName("www.baidu.com"); + // 如果解析不到name和address都为ip + // InetAddress ipBaidu=InetAddress.getByName("115.239.211.112"); + System.out.println("远程地址相关信息:"); + System.out.println("远程计算机名称:" + ipBaidu.getHostName()); + System.out.println("远程ip:" + ipBaidu.getHostAddress()); + } + + // 包含端口的ip地址 + @Test + public void inetSocketAddressToInetAddressTest() { + // localhost=127.0.0.1,本机ip + InetSocketAddress address = new InetSocketAddress("www.baidu.com", 9999); + System.out.println(address.getHostName()); + System.out.println(address.getPort()); + + // InetSocketAddress和InetAddress的相互转换 + InetAddress ip = address.getAddress(); + System.out.println(ip.getHostName());// 计算机名称 + System.out.println(ip.getHostAddress());// ip + } + + @Test + public void urlTest() { + // URI统一资源标识符,用来唯一的标识一个资源 + // URL统一资源定位符,一种具体的URI + // 四部分组成:协议 存放资源的主机名称 端口 资源文件名称 + URL url = null; + + // 读取网页的资源 + InputStream inputStream = null; + InputStreamReader inputStreamReader = null; + BufferedReader bufferedReader = null; + try { + // 绝对路径 + url = new URL("https://pic3.zhimg.com/v2-473b053a376f6752da5d5b06a31b7304_b.jpg"); + System.out.println(url.getProtocol()); + System.out.println(url.getHost()); + System.out.println(url.getPort()); + System.out.println(url.getFile());// 资源 + System.out.println(url.getPath());// 相对路径 + System.out.println(url.getRef());// 锚点 + System.out.println(url.getQuery());// 存在锚点,参数为null;如果不存在锚点,返回正确 + + inputStream = url.openStream(); + inputStreamReader = new InputStreamReader(inputStream, "utf-8"); + bufferedReader = new BufferedReader(inputStreamReader); + + String message; + while ((message = bufferedReader.readLine()) != null) { + System.out.println(message);// 输出到控制台 + } + } catch (IOException e) { + e.printStackTrace(); + } finally { + IOUtils.closeIO(bufferedReader); + } + } +} diff --git a/net/src/test/java/com/zfoo/net/base/bio/chat/BioClientTest.java b/net/src/test/java/com/zfoo/net/base/bio/chat/BioClientTest.java new file mode 100644 index 00000000..d41adab1 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/bio/chat/BioClientTest.java @@ -0,0 +1,139 @@ +package com.zfoo.net.base.bio.chat; + +import com.zfoo.protocol.util.IOUtils; +import com.zfoo.util.ThreadUtils; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.*; +import java.net.Socket; +import java.net.UnknownHostException; +import java.util.Scanner; + +public class BioClientTest { + private String name; + + private class SendThread implements Runnable { + // 控制台输入流 + private BufferedReader console; + // 管道输出流 + private DataOutputStream dataOut; + // 控制线程 + private boolean isRunning; + + private SendThread() { + console = new BufferedReader(new InputStreamReader(System.in)); + isRunning = true; + } + + public SendThread(Socket client) { + this(); + try { + dataOut = new DataOutputStream(client.getOutputStream()); + send(name); + } catch (IOException e) { + e.printStackTrace(); + isRunning = false; + IOUtils.closeIO(console, dataOut); + } + } + + private String getMessageFromConsole() { + try { + System.out.println("please input message:"); + return console.readLine(); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + public void send(String message) { + if (null != message) { + try { + dataOut.writeUTF(message); + dataOut.flush(); + } catch (IOException e) { + isRunning = false; + IOUtils.closeIO(dataOut, console); + } + } + } + + @Override + public void run() { + while (isRunning) { + send(getMessageFromConsole()); + } + } + } + + private static class ReceiveThread implements Runnable { + // 输入流 + private DataInputStream dataIn; + // 线程标识 + private boolean isRunning = true; + + + public ReceiveThread(Socket client) { + try { + dataIn = new DataInputStream(client.getInputStream()); + } catch (IOException e) { + isRunning = false; + IOUtils.closeIO(dataIn); + } + } + + private String receive() { + try { + return dataIn.readUTF(); + } catch (IOException e) { + isRunning = false; + IOUtils.closeIO(dataIn); + } + return null; + } + + @Override + public void run() { + while (isRunning) { + System.out.println(receive()); + } + } + } + + public void launchClient(String name) { + // 1.创建客户端,必须指定服务器+端口,此时就在连接 + Socket client = null; + Thread sendThread = null; + Thread receiveThread = null; + this.name = name; + try { + + client = new Socket("localhost", 9999);// 系统自动分配客户端的端口号 + // 2.发送数据+接受数据 + sendThread = new Thread(new SendThread(client)); + receiveThread = new Thread(new ReceiveThread(client)); + sendThread.start(); + receiveThread.start(); + + } catch (UnknownHostException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + + @Ignore + @Test + public void clientTest() { + System.out.println("请输入名称:"); + String name = new Scanner(System.in).nextLine(); + + new BioClientTest().launchClient(name); + ThreadUtils.sleep(Long.MAX_VALUE); + } + + +} diff --git a/net/src/test/java/com/zfoo/net/base/bio/chat/BioServerTest.java b/net/src/test/java/com/zfoo/net/base/bio/chat/BioServerTest.java new file mode 100644 index 00000000..6469cf21 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/bio/chat/BioServerTest.java @@ -0,0 +1,131 @@ +package com.zfoo.net.base.bio.chat; + +import com.zfoo.protocol.util.IOUtils; +import com.zfoo.util.ThreadUtils; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; + +public class BioServerTest { + + private List listClient = new ArrayList(); + + private class ServerChannel implements Runnable { + // 发送数据 + private DataOutputStream dataOut; + // 接受数据 + private DataInputStream dataIn; + private String name; + private boolean isRunning = true; + + public ServerChannel(Socket client) { + try { + dataOut = new DataOutputStream(client.getOutputStream()); + dataIn = new DataInputStream(client.getInputStream()); + this.name = dataIn.readUTF(); + System.out.println(name); + send("欢迎你进入聊天室!"); + sendToAll(this.name + "进入聊天室!"); + + } catch (IOException e) { + quit(); + } + + } + + private String receive() { + String message = null; + + try { + message = dataIn.readUTF(); + } catch (IOException e) { + quit(); + } + return message; + } + + private void send(String message) { + if (message == null || message.equals("")) + return; + + try { + System.out.println(message); + dataOut.writeUTF(message); + dataOut.flush(); + } catch (IOException e) { + quit(); + } + + } + + private void sendToAll(String message) { + if (message.startsWith("@") && message.contains(":")) { + String name = message.substring(1, message.indexOf(":")); + String content = message.substring(message.indexOf(":") + 1); + for (ServerChannel other : listClient) { + if (other.name.equals(name)) { + other.send(this.name + "悄悄对你说:" + content); + } + } + } else { + + for (ServerChannel other : listClient) { + if (other == this) { + continue; + } + other.send(message); + } + } + } + + private void quit() { + IOUtils.closeIO(dataOut); + isRunning = false; + listClient.remove(this); + } + + @Override + public void run() { + while (isRunning) { + sendToAll(receive()); + } + } + + } + + public void launchServer() { + // 1.穿件服务器,指定端口 + ServerSocket server = null; + try { + server = new ServerSocket(9999); + while (true) { + // 2.接受客户端连接,阻塞式 + Socket client = server.accept(); + ServerChannel serverChannel = new ServerChannel(client); + // 3.发送数据+接受数据 + Thread clienThread = new Thread(serverChannel); + listClient.add(serverChannel); + clienThread.start(); + System.out + .println("server started! Connect to client successfully"); + } + + } catch (IOException e) { + } + } + + @Ignore + @Test + public void bioTest() { + new BioServerTest().launchServer(); + ThreadUtils.sleep(Long.MAX_VALUE); + } + +} diff --git a/net/src/test/java/com/zfoo/net/base/http/AsyncHttpTest.java b/net/src/test/java/com/zfoo/net/base/http/AsyncHttpTest.java new file mode 100644 index 00000000..f2ab93eb --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/http/AsyncHttpTest.java @@ -0,0 +1,42 @@ +package com.zfoo.net.base.http; + +import com.zfoo.util.ThreadUtils; +import org.junit.Ignore; +import org.junit.Test; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.function.Consumer; + + +@Ignore +public class AsyncHttpTest { + + + @Test + public void test() { + HttpClient client = HttpClient.newBuilder() + .executor(Executors.newSingleThreadExecutor()) + .build(); + + HttpRequest request = HttpRequest + .newBuilder(URI.create("https://www.baidu.com")) + .GET() + .build(); + + HttpResponse.BodyHandler responseBodyHandler = HttpResponse.BodyHandlers.ofString(); + CompletableFuture> sendAsync = client.sendAsync(request, responseBodyHandler); + sendAsync.thenApply(t -> t.body()).thenAccept(new Consumer() { + @Override + public void accept(String s) { + System.out.println(s); + } + }); + ThreadUtils.sleep(Integer.MAX_VALUE); + } + +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/echoserver1/EchoClientHandler.java b/net/src/test/java/com/zfoo/net/base/netty/echoserver1/EchoClientHandler.java new file mode 100644 index 00000000..512fb8e9 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/echoserver1/EchoClientHandler.java @@ -0,0 +1,42 @@ +package com.zfoo.net.base.netty.echoserver1; + +import com.zfoo.protocol.util.StringUtils; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +public class EchoClientHandler extends ChannelInboundHandlerAdapter { + + public static final String ECHO_REQUEST = "Welcome to netty!" + StringUtils.DOLLAR; + + private int count; + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ByteBuf firstMessage; + byte[] req = ECHO_REQUEST.getBytes(); + for (int i = 0; i < 100; i++) { + ctx.writeAndFlush(Unpooled.copiedBuffer(req)); + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + String body = (String) msg; + System.out.println("This is " + count + " times receive sever :[" + body + "]"); + count++; + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.close(); + } + + +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/echoserver1/EchoClientTest.java b/net/src/test/java/com/zfoo/net/base/netty/echoserver1/EchoClientTest.java new file mode 100644 index 00000000..852d8550 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/echoserver1/EchoClientTest.java @@ -0,0 +1,59 @@ +package com.zfoo.net.base.netty.echoserver1; + +import com.zfoo.protocol.util.StringUtils; +import com.zfoo.util.ThreadUtils; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.DelimiterBasedFrameDecoder; +import io.netty.handler.codec.string.StringDecoder; +import org.junit.Ignore; +import org.junit.Test; + +public class EchoClientTest { + + + public void connect(int port, String host) { + EventLoopGroup group = new NioEventLoopGroup(); + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(group).channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .handler(new ChildChannelHandler()); + ChannelFuture future = null; + try { + future = bootstrap.connect(host, port).sync(); + future.channel().closeFuture().sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + group.shutdownGracefully(); + } + } + + private class ChildChannelHandler extends ChannelInitializer { + @Override + protected void initChannel(SocketChannel channel) throws Exception { + ByteBuf delimiter = Unpooled.copiedBuffer(StringUtils.DOLLAR.getBytes()); + channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); + channel.pipeline().addLast(new StringDecoder()); + channel.pipeline().addLast(new EchoClientHandler()); + } + } + + @Ignore + @Test + public void clientTest() { + new EchoClientTest().connect(9999, "127.0.0.1"); + System.out.println("hello"); + ThreadUtils.sleep(Long.MAX_VALUE); + } + + +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/echoserver1/EchoServerTest.java b/net/src/test/java/com/zfoo/net/base/netty/echoserver1/EchoServerTest.java new file mode 100644 index 00000000..86741c65 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/echoserver1/EchoServerTest.java @@ -0,0 +1,83 @@ +package com.zfoo.net.base.netty.echoserver1; + +import com.zfoo.protocol.util.StringUtils; +import com.zfoo.util.ThreadUtils; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.DelimiterBasedFrameDecoder; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import org.junit.Ignore; +import org.junit.Test; + +/** + * 分隔符通信服务器设计 + * + * @author SunInsanity + * @version 1.0 + * @since 2017 05.22 18:23 + */ +public class EchoServerTest { + + private int port; + + public EchoServerTest(int port) { + this.port = port; + } + + public void init() { + //配置服务端nio线程组 + EventLoopGroup bossGroup = new NioEventLoopGroup();//服务端接受客户端连接 + EventLoopGroup workerGroup = new NioEventLoopGroup();//SocketChannel的网络读写 + try { + System.out.println("sdfs"); + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChildChannelHandler()); + + //绑定端口,同步等待成功 + ChannelFuture future = bootstrap.bind(port).sync(); + //等待服务端监听端口关闭 + future.channel().closeFuture().sync(); + System.out.println("sdfs"); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + //优雅的退出,释放线程池资源 + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } + + private class ChildChannelHandler extends ChannelInitializer { + @Override + protected void initChannel(SocketChannel channel) throws Exception { + ByteBuf delimiter = Unpooled.copiedBuffer(StringUtils.DOLLAR.getBytes()); + channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); + channel.pipeline().addLast(new StringDecoder()); + channel.pipeline().addLast(new EchoSeverHandler()); + } + } + + + @Ignore + @Test + public void serverTest() { + System.out.println("hello"); + EchoServerTest server = new EchoServerTest(9999); + server.init(); + System.out.println("hello"); + ThreadUtils.sleep(Long.MAX_VALUE); + } + +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/echoserver1/EchoSeverHandler.java b/net/src/test/java/com/zfoo/net/base/netty/echoserver1/EchoSeverHandler.java new file mode 100644 index 00000000..1eac511a --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/echoserver1/EchoSeverHandler.java @@ -0,0 +1,35 @@ +package com.zfoo.net.base.netty.echoserver1; + +import com.zfoo.protocol.util.StringUtils; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +public class EchoSeverHandler extends ChannelInboundHandlerAdapter { + + + private int count; + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + String body = (String) msg; + System.out.println("The is " + count + "times receive client : [" + body + "]"); + count++; + body = body + StringUtils.DOLLAR; + ByteBuf writeBuffer = Unpooled.copiedBuffer(body.getBytes()); + ctx.writeAndFlush(writeBuffer); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } + +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/echoserver2/EchoClientHandler.java b/net/src/test/java/com/zfoo/net/base/netty/echoserver2/EchoClientHandler.java new file mode 100644 index 00000000..394e8842 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/echoserver2/EchoClientHandler.java @@ -0,0 +1,38 @@ +package com.zfoo.net.base.netty.echoserver2; + +import com.zfoo.protocol.util.StringUtils; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +public class EchoClientHandler extends ChannelInboundHandlerAdapter { + + public static final String ECHO_REQUEST = "Welcome to netty!" + StringUtils.DOLLAR; + + private int count; + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ByteBuf firstMessage; + byte[] req = ECHO_REQUEST.getBytes(); + for (int i = 0; i < 100; i++) { + ctx.writeAndFlush(Unpooled.copiedBuffer(req)); + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + System.out.println("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh"); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.close(); + } +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/echoserver2/EchoClientTest.java b/net/src/test/java/com/zfoo/net/base/netty/echoserver2/EchoClientTest.java new file mode 100644 index 00000000..4e83f04b --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/echoserver2/EchoClientTest.java @@ -0,0 +1,54 @@ +package com.zfoo.net.base.netty.echoserver2; + +import com.zfoo.util.ThreadUtils; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.FixedLengthFrameDecoder; +import io.netty.handler.codec.string.StringDecoder; +import org.junit.Ignore; +import org.junit.Test; + +public class EchoClientTest { + + + public void connect(int port, String host) { + EventLoopGroup group = new NioEventLoopGroup(); + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(group).channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .handler(new ChildChannelHandler()); + ChannelFuture future = null; + try { + future = bootstrap.connect(host, port).sync(); + future.channel().closeFuture().sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + group.shutdownGracefully(); + } + } + + private class ChildChannelHandler extends ChannelInitializer { + @Override + protected void initChannel(SocketChannel channel) throws Exception { + channel.pipeline().addLast(new FixedLengthFrameDecoder(20)); + channel.pipeline().addLast(new StringDecoder()); + channel.pipeline().addLast(new EchoClientHandler()); + } + } + + @Ignore + @Test + public void clientTest() { + new EchoClientTest().connect(9999, "127.0.0.1"); + System.out.println("hello"); + ThreadUtils.sleep(Long.MAX_VALUE); + } + +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/echoserver2/EchoServerTest.java b/net/src/test/java/com/zfoo/net/base/netty/echoserver2/EchoServerTest.java new file mode 100644 index 00000000..f68af178 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/echoserver2/EchoServerTest.java @@ -0,0 +1,76 @@ +package com.zfoo.net.base.netty.echoserver2; + +import com.zfoo.util.ThreadUtils; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.FixedLengthFrameDecoder; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import org.junit.Ignore; +import org.junit.Test; + +/** + * 定长通信 + * + * @author jaysunxiao + * @version 1.0 + * @since 2017 05.22 18:23 + */ +public class EchoServerTest { + + private int port; + + public EchoServerTest(int port) { + this.port = port; + } + + public void init() { + //配置服务端nio线程组 + EventLoopGroup bossGroup = new NioEventLoopGroup();//服务端接受客户端连接 + EventLoopGroup workerGroup = new NioEventLoopGroup();//SocketChannel的网络读写 + try { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChildChannelHandler()); + + //绑定端口,同步等待成功 + ChannelFuture future = bootstrap.bind(port).sync(); + //等待服务端监听端口关闭 + future.channel().closeFuture().sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + //优雅的退出,释放线程池资源 + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } + + private class ChildChannelHandler extends ChannelInitializer { + @Override + protected void initChannel(SocketChannel channel) throws Exception { + channel.pipeline().addLast(new FixedLengthFrameDecoder(20)); + channel.pipeline().addLast(new StringDecoder()); + channel.pipeline().addLast(new EchoSeverHandler()); + } + } + + @Ignore + @Test + public void serverTest() { + System.out.println("hello"); + EchoServerTest server = new EchoServerTest(9999); + server.init(); + System.out.println("hello"); + ThreadUtils.sleep(Long.MAX_VALUE); + } + +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/echoserver2/EchoSeverHandler.java b/net/src/test/java/com/zfoo/net/base/netty/echoserver2/EchoSeverHandler.java new file mode 100644 index 00000000..d183e60e --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/echoserver2/EchoSeverHandler.java @@ -0,0 +1,25 @@ +package com.zfoo.net.base.netty.echoserver2; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +public class EchoSeverHandler extends ChannelInboundHandlerAdapter { + + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + System.out.println("Server receive client : [" + msg + "]"); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } + +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/nettyfileserver/ClientHandler.java b/net/src/test/java/com/zfoo/net/base/netty/nettyfileserver/ClientHandler.java new file mode 100644 index 00000000..a6536692 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/nettyfileserver/ClientHandler.java @@ -0,0 +1,41 @@ +package com.zfoo.net.base.netty.nettyfileserver; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +/** + * @author jaysunxiao + * @version 1.0 + * @since 2017 05.31 10:22 + */ +public class ClientHandler extends ChannelInboundHandlerAdapter { + + private String filePath; + + + public ClientHandler(String filePath) { + this.filePath = filePath; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + byte[] req = new String(filePath + System.getProperty("line.separator")).getBytes(); + ByteBuf message = Unpooled.buffer(req.length); + message.writeBytes(req); + ctx.writeAndFlush(message); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + String body = (String) msg; + System.out.println(body); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.close(); + } +} + diff --git a/net/src/test/java/com/zfoo/net/base/netty/nettyfileserver/FileClientTest.java b/net/src/test/java/com/zfoo/net/base/netty/nettyfileserver/FileClientTest.java new file mode 100644 index 00000000..1a26b07f --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/nettyfileserver/FileClientTest.java @@ -0,0 +1,73 @@ +package com.zfoo.net.base.netty.nettyfileserver; + +import com.zfoo.util.ThreadUtils; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.LineBasedFrameDecoder; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import io.netty.util.CharsetUtil; +import org.junit.Ignore; +import org.junit.Test; + +/** + * @author jaysunxiao + * @version 1.0 + * @since 2017 05.31 10:22 + */ +public class FileClientTest { + + private int port; + private String host; + private String filePath; + + public FileClientTest(int port, String host, String filePath) { + this.port = port; + this.host = host; + this.filePath = filePath; + } + + public void connect() { + EventLoopGroup group = new NioEventLoopGroup(); + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(group).channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .handler(new ChildChannelHandler()); + ChannelFuture future = null; + try { + future = bootstrap.connect(host, port).sync(); + future.channel().closeFuture().sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + group.shutdownGracefully(); + } + } + + private class ChildChannelHandler extends ChannelInitializer { + @Override + protected void initChannel(SocketChannel channel) throws Exception { + channel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); + channel.pipeline().addLast(new LineBasedFrameDecoder(1024)); + channel.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));//三者组合起来就是文本换行编码解码器 + channel.pipeline().addLast(new ClientHandler(filePath)); + } + } + + @Ignore + @Test + public void clientTest() { + FileClientTest client = new FileClientTest(9999, "127.0.0.1", "rainbow.txt"); + client.connect(); + System.out.println("hello"); + ThreadUtils.sleep(Long.MAX_VALUE); + } + + +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/nettyfileserver/FileServerHandler.java b/net/src/test/java/com/zfoo/net/base/netty/nettyfileserver/FileServerHandler.java new file mode 100644 index 00000000..634fa905 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/nettyfileserver/FileServerHandler.java @@ -0,0 +1,56 @@ +package com.zfoo.net.base.netty.nettyfileserver; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.FileRegion; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; + +/** + * @author jaysunxiao + * @version 1.0 + * @since 2017 05.31 09:58 + */ +public class FileServerHandler extends ChannelInboundHandlerAdapter { + + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + System.out.println("Server receive client : [" + msg + "]"); + File file = new File((String) msg); + if (file.exists() && !file.isFile()) { + ctx.writeAndFlush("not a file:" + file + System.getProperty("line.separator")); + return; + } + ctx.write(file + " " + file.length()); + RandomAccessFile randomAccessFile = null; + FileRegion region = null; + try { + randomAccessFile = new RandomAccessFile((String) msg, "r"); + region = new DefaultFileRegion(randomAccessFile.getChannel(), 0, randomAccessFile.length()); + ctx.write(region); + ctx.writeAndFlush(System.getProperty("line.separator")); + randomAccessFile.close(); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } + +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/nettyfileserver/FileServerTest.java b/net/src/test/java/com/zfoo/net/base/netty/nettyfileserver/FileServerTest.java new file mode 100644 index 00000000..b4a46749 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/nettyfileserver/FileServerTest.java @@ -0,0 +1,79 @@ +package com.zfoo.net.base.netty.nettyfileserver; + +import com.zfoo.util.ThreadUtils; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.LineBasedFrameDecoder; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.util.CharsetUtil; +import org.junit.Ignore; +import org.junit.Test; + +/** + * 通过netty的通信,和java原生的RandomAccessFile来实现小文件的传输 + * + * @author jaysunxiao + * @version 1.0 + * @since 2017 05.31 09:49 + */ +public class FileServerTest { + + private int port; + + public FileServerTest(int port) { + this.port = port; + } + + public void init() { + //配置服务端nio线程组 + EventLoopGroup bossGroup = new NioEventLoopGroup();//服务端接受客户端连接 + EventLoopGroup workerGroup = new NioEventLoopGroup();//SocketChannel的网络读写 + try { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChildChannelHandler()); + + //绑定端口,同步等待成功 + ChannelFuture future = bootstrap.bind(port).sync(); + //等待服务端监听端口关闭 + future.channel().closeFuture().sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + //优雅的退出,释放线程池资源 + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } + + private class ChildChannelHandler extends ChannelInitializer { + @Override + protected void initChannel(SocketChannel channel) throws Exception { + channel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); + channel.pipeline().addLast(new LineBasedFrameDecoder(1024)); + channel.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));//三者组合起来就是文本换行编码解码器 + channel.pipeline().addLast(new FileServerHandler()); + } + } + + @Ignore + @Test + public void serverTest() { + System.out.println("hello"); + FileServerTest server = new FileServerTest(9999); + server.init(); + System.out.println("hello"); + ThreadUtils.sleep(Long.MAX_VALUE); + } + +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/subscribe/ClientHandler.java b/net/src/test/java/com/zfoo/net/base/netty/subscribe/ClientHandler.java new file mode 100644 index 00000000..17b9071f --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/subscribe/ClientHandler.java @@ -0,0 +1,36 @@ +package com.zfoo.net.base.netty.subscribe; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +/** + * @author jaysunxiao + * @version 1.0 + * @since 2017 05.23 18:02 + */ +public class ClientHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + for (int i = 0; i < 10; i++) { + //*****一定要实现Serializable,不实现不会抛异常,很难查找bug + ctx.write(new SubscribeReq(i, "sun", "nettyBook", "1885630", "shanghai")); + } + ctx.flush(); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + System.out.println(msg); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.close(); + } +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/subscribe/Serverhandler.java b/net/src/test/java/com/zfoo/net/base/netty/subscribe/Serverhandler.java new file mode 100644 index 00000000..7a0a12da --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/subscribe/Serverhandler.java @@ -0,0 +1,32 @@ +package com.zfoo.net.base.netty.subscribe; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +/** + * @author jaysunxiao + * @version 1.0 + * @since 2017 05.23 17:13 + */ +public class Serverhandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + SubscribeReq req = (SubscribeReq) msg; + System.out.println("Server accept client subscribe req: " + req.toString()); + SubscribeResp resp = new SubscribeResp(req.getReqID(), 0, "subscribe successfully!"); + //*****一定要实现Serializable,不实现不会抛异常,很难查找bug + ctx.writeAndFlush(resp); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/subscribe/SubscribeClientTest.java b/net/src/test/java/com/zfoo/net/base/netty/subscribe/SubscribeClientTest.java new file mode 100644 index 00000000..af3b2e52 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/subscribe/SubscribeClientTest.java @@ -0,0 +1,61 @@ +package com.zfoo.net.base.netty.subscribe; + +import com.zfoo.util.ThreadUtils; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.serialization.ClassResolvers; +import io.netty.handler.codec.serialization.ObjectDecoder; +import io.netty.handler.codec.serialization.ObjectEncoder; +import org.junit.Ignore; +import org.junit.Test; + +/** + * @author jaysunxiao + * @version 1.0 + * @since 2017 05.23 18:02 + */ +public class SubscribeClientTest { + + + public void connect(int port, String host) { + EventLoopGroup group = new NioEventLoopGroup(); + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(group).channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .handler(new ChildChannelHandler()); + ChannelFuture future = null; + try { + future = bootstrap.connect(host, port).sync(); + future.channel().closeFuture().sync(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + group.shutdownGracefully(); + } + } + + private class ChildChannelHandler extends ChannelInitializer { + @Override + protected void initChannel(SocketChannel channel) throws Exception { + channel.pipeline().addLast(new ObjectDecoder(1024 * 1024 + , ClassResolvers.cacheDisabled(this.getClass().getClassLoader()))); + channel.pipeline().addLast(new ObjectEncoder()); + channel.pipeline().addLast(new ClientHandler()); + } + } + + @Ignore + @Test + public void clientTest() { + new SubscribeClientTest().connect(9999, "127.0.0.1"); + System.out.println("hello"); + ThreadUtils.sleep(Long.MAX_VALUE); + } + +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/subscribe/SubscribeReq.java b/net/src/test/java/com/zfoo/net/base/netty/subscribe/SubscribeReq.java new file mode 100644 index 00000000..ec6598b7 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/subscribe/SubscribeReq.java @@ -0,0 +1,82 @@ +package com.zfoo.net.base.netty.subscribe; + +import java.io.Serializable; + +/** + * @author jaysunxiao + * @version 1.0 + * @since 2017 05.23 16:51 + */ +public class SubscribeReq implements Serializable { + + private static final long serialVersionUID = 1L; + + private int reqID; + + private String userName; + + private String productName; + + private String phoneNumber; + + private String address; + + public SubscribeReq(int reqID, String userName, String productName, String phoneNumber, String address) { + this.reqID = reqID; + this.userName = userName; + this.productName = productName; + this.phoneNumber = phoneNumber; + this.address = address; + } + + public int getReqID() { + return reqID; + } + + public void setReqID(int reqID) { + this.reqID = reqID; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public String getProductName() { + return productName; + } + + public void setProductName(String productName) { + this.productName = productName; + } + + public String getPhoneNumber() { + return phoneNumber; + } + + public void setPhoneNumber(String phoneNumber) { + this.phoneNumber = phoneNumber; + } + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + @Override + public String toString() { + return "SubscribeReq{" + + "reqID=" + reqID + + ", userName='" + userName + '\'' + + ", productName='" + productName + '\'' + + ", phoneNumber='" + phoneNumber + '\'' + + ", address='" + address + '\'' + + '}'; + } +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/subscribe/SubscribeResp.java b/net/src/test/java/com/zfoo/net/base/netty/subscribe/SubscribeResp.java new file mode 100644 index 00000000..cb77784c --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/subscribe/SubscribeResp.java @@ -0,0 +1,58 @@ +package com.zfoo.net.base.netty.subscribe; + +import java.io.Serializable; + +/** + * @author jaysunxiao + * @version 1.0 + * @since 2017 05.23 16:52 + */ +public class SubscribeResp implements Serializable { + + private static final long serialVersionUID = 1L; + + private int respID; + + private int respCode; + + private String desc; + + public SubscribeResp(int respID, int respCode, String desc) { + this.respID = respID; + this.respCode = respCode; + this.desc = desc; + } + + public int getRespID() { + return respID; + } + + public void setRespID(int respID) { + this.respID = respID; + } + + public int getRespCode() { + return respCode; + } + + public void setRespCode(int respCode) { + this.respCode = respCode; + } + + public String getDesc() { + return desc; + } + + public void setDesc(String desc) { + this.desc = desc; + } + + @Override + public String toString() { + return "SubscribeResp{" + + "respID=" + respID + + ", respCode=" + respCode + + ", desc='" + desc + '\'' + + '}'; + } +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/subscribe/SubscribeServerTest.java b/net/src/test/java/com/zfoo/net/base/netty/subscribe/SubscribeServerTest.java new file mode 100644 index 00000000..2ce8f2f0 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/subscribe/SubscribeServerTest.java @@ -0,0 +1,80 @@ +package com.zfoo.net.base.netty.subscribe; + +import com.zfoo.util.ThreadUtils; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.serialization.ClassResolvers; +import io.netty.handler.codec.serialization.ObjectDecoder; +import io.netty.handler.codec.serialization.ObjectEncoder; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import org.junit.Ignore; +import org.junit.Test; + +/** + * 对象读写序列化通信 + * + * @author jaysunxiao + * @version 1.0 + * @since 2017 05.23 16:51 + */ +public class SubscribeServerTest { + + private int port; + + public SubscribeServerTest(int port) { + this.port = port; + } + + public void init() { + //配置服务端nio线程组 + EventLoopGroup bossGroup = new NioEventLoopGroup();//服务端接受客户端连接 + EventLoopGroup workerGroup = new NioEventLoopGroup();//SocketChannel的网络读写 + try { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChildChannelHandler()); + + //绑定端口,同步等待成功 + ChannelFuture future = bootstrap.bind(port).sync(); + //等待服务端监听端口关闭 + future.channel().closeFuture().sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + //优雅的退出,释放线程池资源 + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } + + private class ChildChannelHandler extends ChannelInitializer { + @Override + protected void initChannel(SocketChannel channel) throws Exception { + channel.pipeline().addLast(new ObjectDecoder(1024 * 1024 + , ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); + //*****一定要实现Serializable,不实现不会抛异常,很难查找bug + channel.pipeline().addLast(new ObjectEncoder()); + channel.pipeline().addLast(new Serverhandler()); + } + } + + + @Ignore + @Test + public void serverTest() { + System.out.println("hello"); + SubscribeServerTest server = new SubscribeServerTest(9999); + server.init(); + System.out.println("hello"); + ThreadUtils.sleep(Long.MAX_VALUE); + } + +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/timeserver1/TimeClientHandler.java b/net/src/test/java/com/zfoo/net/base/netty/timeserver1/TimeClientHandler.java new file mode 100644 index 00000000..38f22cc3 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/timeserver1/TimeClientHandler.java @@ -0,0 +1,39 @@ +package com.zfoo.net.base.netty.timeserver1; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +public class TimeClientHandler extends ChannelInboundHandlerAdapter { + + + public TimeClientHandler() { + + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ByteBuf firstMessage; + byte[] req = "QUERY TIME ORDER".getBytes(); + for (int i = 0; i < 100; i++) { + firstMessage = Unpooled.buffer(req.length); + firstMessage.writeBytes(req); + ctx.writeAndFlush(firstMessage); + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ByteBuf readBuffer = (ByteBuf) msg; + byte[] read = new byte[readBuffer.readableBytes()]; + readBuffer.readBytes(read); + String body = new String(read, "UTF-8"); + System.out.println("Now is :" + body); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.close(); + } +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/timeserver1/TimeClientTest.java b/net/src/test/java/com/zfoo/net/base/netty/timeserver1/TimeClientTest.java new file mode 100644 index 00000000..40e0e411 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/timeserver1/TimeClientTest.java @@ -0,0 +1,49 @@ +package com.zfoo.net.base.netty.timeserver1; + +import com.zfoo.util.ThreadUtils; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import org.junit.Ignore; +import org.junit.Test; + +public class TimeClientTest { + + public void connect(int port, String host) { + EventLoopGroup group = new NioEventLoopGroup(); + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(group).channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .handler(new ChildChannelHandler()); + ChannelFuture future = null; + try { + future = bootstrap.connect(host, port).sync(); + future.channel().closeFuture().sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + group.shutdownGracefully(); + } + } + + private class ChildChannelHandler extends ChannelInitializer { + @Override + protected void initChannel(SocketChannel channel) throws Exception { + channel.pipeline().addLast(new TimeClientHandler()); + } + } + + @Ignore + @Test + public void clientTest() { + new TimeClientTest().connect(9999, "127.0.0.1"); + ThreadUtils.sleep(Long.MAX_VALUE); + } + + +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/timeserver1/TimeServerHandler.java b/net/src/test/java/com/zfoo/net/base/netty/timeserver1/TimeServerHandler.java new file mode 100644 index 00000000..4ffcad59 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/timeserver1/TimeServerHandler.java @@ -0,0 +1,36 @@ +package com.zfoo.net.base.netty.timeserver1; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +import java.util.Date; + +public class TimeServerHandler extends ChannelInboundHandlerAdapter { + + private int count; + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ByteBuf buffer = (ByteBuf) msg; + byte[] req = new byte[buffer.readableBytes()]; + buffer.readBytes(req); + String body = new String(req, "UTF-8"); + System.out.println("The time server receive order :" + body + count++); + String currentTime = "QUERY TIME ORDER" + new Date().toString(); + ByteBuf writeBuffer = Unpooled.copiedBuffer(currentTime.getBytes()); + ctx.writeAndFlush(writeBuffer); + System.out.println(count++); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.close(); + } +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/timeserver1/TimeServerTest.java b/net/src/test/java/com/zfoo/net/base/netty/timeserver1/TimeServerTest.java new file mode 100644 index 00000000..2b62a51e --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/timeserver1/TimeServerTest.java @@ -0,0 +1,66 @@ +package com.zfoo.net.base.netty.timeserver1; + +import com.zfoo.util.ThreadUtils; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import org.junit.Ignore; +import org.junit.Test; + +/** + * 没有考虑到半包读写的服务器 + * + * @author jaysunxiao + * @version 1.0 + * @since 2017 05.22 18:23 + */ +public class TimeServerTest { + + private int port; + + public TimeServerTest(int port) { + this.port = port; + } + + public void init() { + //配置服务端nio线程组 + EventLoopGroup bossGroup = new NioEventLoopGroup();//服务端接受客户端连接 + EventLoopGroup workerGroup = new NioEventLoopGroup();//SocketChannel的网络读写 + try { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChildChannelHandler()); + //绑定端口,同步等待成功 + ChannelFuture future = bootstrap.bind(port).sync(); + //等待服务端监听端口关闭 + future.channel().closeFuture().sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + //优雅的退出,释放线程池资源 + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } + + private class ChildChannelHandler extends ChannelInitializer { + @Override + protected void initChannel(SocketChannel channel) throws Exception { + channel.pipeline().addLast(new TimeServerHandler()); + } + } + + @Ignore + @Test + public void serverTest() { + TimeServerTest server = new TimeServerTest(9999); + server.init(); + ThreadUtils.sleep(Long.MAX_VALUE); + } + +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/timeserver2/TimeClientHandler.java b/net/src/test/java/com/zfoo/net/base/netty/timeserver2/TimeClientHandler.java new file mode 100644 index 00000000..908ff564 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/timeserver2/TimeClientHandler.java @@ -0,0 +1,36 @@ +package com.zfoo.net.base.netty.timeserver2; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +public class TimeClientHandler extends ChannelInboundHandlerAdapter { + + + public TimeClientHandler() { + + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ByteBuf firstMessage; + byte[] req = new String("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes(); + for (int i = 0; i < 100; i++) { + firstMessage = Unpooled.buffer(req.length); + firstMessage.writeBytes(req); + ctx.writeAndFlush(firstMessage); + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + String body = (String) msg; + System.out.println("Now is :" + body); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.close(); + } +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/timeserver2/TimeClientTest.java b/net/src/test/java/com/zfoo/net/base/netty/timeserver2/TimeClientTest.java new file mode 100644 index 00000000..eb78c903 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/timeserver2/TimeClientTest.java @@ -0,0 +1,54 @@ +package com.zfoo.net.base.netty.timeserver2; + +import com.zfoo.util.ThreadUtils; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.LineBasedFrameDecoder; +import io.netty.handler.codec.string.StringDecoder; +import org.junit.Ignore; +import org.junit.Test; + +public class TimeClientTest { + + public void connect(int port, String host) { + EventLoopGroup group = new NioEventLoopGroup(); + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(group).channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .handler(new ChildChannelHandler()); + ChannelFuture future = null; + try { + future = bootstrap.connect(host, port).sync(); + future.channel().closeFuture().sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + group.shutdownGracefully(); + } + } + + private class ChildChannelHandler extends ChannelInitializer { + @Override + protected void initChannel(SocketChannel channel) throws Exception { + channel.pipeline().addLast(new LineBasedFrameDecoder(1024)); + channel.pipeline().addLast(new StringDecoder()); + channel.pipeline().addLast(new TimeClientHandler()); + } + } + + @Ignore + @Test + public void clientTest() { + new TimeClientTest().connect(9999, "127.0.0.1"); + System.out.println("hello"); + ThreadUtils.sleep(Long.MAX_VALUE); + } + + +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/timeserver2/TimeServerHandler.java b/net/src/test/java/com/zfoo/net/base/netty/timeserver2/TimeServerHandler.java new file mode 100644 index 00000000..6f9906ce --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/timeserver2/TimeServerHandler.java @@ -0,0 +1,33 @@ +package com.zfoo.net.base.netty.timeserver2; + +import com.zfoo.protocol.util.FileUtils; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +import java.util.Date; + +public class TimeServerHandler extends ChannelInboundHandlerAdapter { + + private int count; + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + String body = (String) msg; + System.out.println("The time server receive order :" + body + count++); + String currentTime = "QUERY TIME ORDER" + new Date().toString() + FileUtils.LS; + ByteBuf writeBuffer = Unpooled.copiedBuffer(currentTime.getBytes()); + ctx.writeAndFlush(writeBuffer); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.close(); + } +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/timeserver2/TimeServerTest.java b/net/src/test/java/com/zfoo/net/base/netty/timeserver2/TimeServerTest.java new file mode 100644 index 00000000..4e6de184 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/timeserver2/TimeServerTest.java @@ -0,0 +1,72 @@ +package com.zfoo.net.base.netty.timeserver2; + +import com.zfoo.util.ThreadUtils; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.LineBasedFrameDecoder; +import io.netty.handler.codec.string.StringDecoder; +import org.junit.Ignore; +import org.junit.Test; + +/** + * 通过换行符解决半包读写的问题 + * + * @author jaysunxiao + * @version 1.0 + * @since 2017 05.22 18:23 + */ +public class TimeServerTest { + + private int port; + + public TimeServerTest(int port) { + this.port = port; + } + + public void init() { + //配置服务端nio线程组 + EventLoopGroup bossGroup = new NioEventLoopGroup();//服务端接受客户端连接 + EventLoopGroup workerGroup = new NioEventLoopGroup();//SocketChannel的网络读写 + try { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChildChannelHandler()); + //绑定端口,同步等待成功 + ChannelFuture future = bootstrap.bind(port).sync(); + //等待服务端监听端口关闭 + future.channel().closeFuture().sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + //优雅的退出,释放线程池资源 + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } + + private class ChildChannelHandler extends ChannelInitializer { + @Override + protected void initChannel(SocketChannel channel) throws Exception { + channel.pipeline().addLast(new LineBasedFrameDecoder(1024)); + channel.pipeline().addLast(new StringDecoder()); + channel.pipeline().addLast(new TimeServerHandler()); + } + } + + @Ignore + @Test + public void serverTest() { + System.out.println("hello"); + TimeServerTest server = new TimeServerTest(9999); + server.init(); + System.out.println("hello"); + ThreadUtils.sleep(Long.MAX_VALUE); + } + +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/udpserver/UDPClientHandler.java b/net/src/test/java/com/zfoo/net/base/netty/udpserver/UDPClientHandler.java new file mode 100644 index 00000000..c3b9e5f9 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/udpserver/UDPClientHandler.java @@ -0,0 +1,31 @@ +package com.zfoo.net.base.netty.udpserver; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.DatagramPacket; +import io.netty.util.CharsetUtil; + +/** + * @author jaysunxiao + * @version 1.0 + * @since 2017 05.27 17:48 + */ +public class UDPClientHandler extends SimpleChannelInboundHandler { + + private static String DICTIONARY = "bbb"; + + @Override + protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception { + String req = packet.content().toString(CharsetUtil.UTF_8); + System.out.println(req); + ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(DICTIONARY, CharsetUtil.UTF_8) + , packet.sender())); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.close(); + cause.printStackTrace(); + } +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/udpserver/UDPClientTest.java b/net/src/test/java/com/zfoo/net/base/netty/udpserver/UDPClientTest.java new file mode 100644 index 00000000..cca34e24 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/udpserver/UDPClientTest.java @@ -0,0 +1,71 @@ +package com.zfoo.net.base.netty.udpserver; + +import com.zfoo.util.ThreadUtils; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.DatagramPacket; +import io.netty.channel.socket.nio.NioDatagramChannel; +import io.netty.util.CharsetUtil; +import org.junit.Ignore; +import org.junit.Test; + +import java.net.InetSocketAddress; + +/** + * @author jaysunxiao + * @version 1.0 + * @since 2017 05.27 17:48 + */ +public class UDPClientTest { + + + private int port; + + public UDPClientTest(int port) { + this.port = port; + } + + public void init() { + //配置服务端nio线程组 + EventLoopGroup group = new NioEventLoopGroup();//服务端接受客户端连接 + try { + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(group).channel(NioDatagramChannel.class) + .option(ChannelOption.SO_BROADCAST, true).handler(new UDPClientHandler()); + //绑定端口,同步等待成功 + ChannelFuture future = bootstrap.bind(0).sync(); + //等待服务端监听端口关闭 + Channel channel = future.channel(); + + //向内网的所有机器广播UDP消息 + channel.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("client" + , CharsetUtil.UTF_8), new InetSocketAddress("127.0.0.1", port))).sync(); + + if (!channel.closeFuture().await(5000)) { + System.out.println("查询超时!"); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + //优雅的退出,释放线程池资源 + group.shutdownGracefully(); + } + } + + @Ignore + @Test + public void clientTest() { + System.out.println("hello"); + UDPClientTest server = new UDPClientTest(9999); + server.init(); + System.out.println("hello"); + ThreadUtils.sleep(Long.MAX_VALUE); + } + + +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/udpserver/UDPServerHandler.java b/net/src/test/java/com/zfoo/net/base/netty/udpserver/UDPServerHandler.java new file mode 100644 index 00000000..6965fcc1 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/udpserver/UDPServerHandler.java @@ -0,0 +1,33 @@ +package com.zfoo.net.base.netty.udpserver; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.DatagramPacket; +import io.netty.util.CharsetUtil; + +/** + * @author jaysunxiao + * @version 1.0 + * @since 2017 05.27 17:27 + */ +public class UDPServerHandler extends SimpleChannelInboundHandler { + + private static String DICTIONARY = "aaa"; + + @Override + protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception { + String req = packet.content().toString(CharsetUtil.UTF_8); + System.out.println(req); + if (req.equals("client")) { + ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(UDPServerHandler.DICTIONARY, CharsetUtil.UTF_8) + , packet.sender())); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.close(); + cause.printStackTrace(); + } +} diff --git a/net/src/test/java/com/zfoo/net/base/netty/udpserver/UDPServerTest.java b/net/src/test/java/com/zfoo/net/base/netty/udpserver/UDPServerTest.java new file mode 100644 index 00000000..fbcb9c9c --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/netty/udpserver/UDPServerTest.java @@ -0,0 +1,58 @@ +package com.zfoo.net.base.netty.udpserver; + +import com.zfoo.util.ThreadUtils; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioDatagramChannel; +import org.junit.Ignore; +import org.junit.Test; + +/** + * UDP通信 + * + * @author jaysunxiao + * @version 1.0 + * @since 2017 05.27 17:24 + */ +public class UDPServerTest { + + private int port; + + public UDPServerTest(int port) { + this.port = port; + } + + public void init() { + //配置服务端nio线程组 + EventLoopGroup group = new NioEventLoopGroup();//服务端接受客户端连接 + try { + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(group).channel(NioDatagramChannel.class) + .option(ChannelOption.SO_BROADCAST, true).handler(new UDPServerHandler()); + //绑定端口,同步等待成功 + ChannelFuture future = bootstrap.bind(port).sync(); + //等待服务端监听端口关闭 + future.channel().closeFuture().await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + //优雅的退出,释放线程池资源 + group.shutdownGracefully(); + } + } + + + @Ignore + @Test + public void serverTest() { + System.out.println("hello"); + UDPServerTest server = new UDPServerTest(9999); + server.init(); + System.out.println("hello"); + ThreadUtils.sleep(Long.MAX_VALUE); + } + +} diff --git a/net/src/test/java/com/zfoo/net/base/nio/filechannel/FileReaderTest.java b/net/src/test/java/com/zfoo/net/base/nio/filechannel/FileReaderTest.java new file mode 100644 index 00000000..4eb304fb --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/nio/filechannel/FileReaderTest.java @@ -0,0 +1,47 @@ +package com.zfoo.net.base.nio.filechannel; + +import com.zfoo.util.ThreadUtils; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +@Ignore +public class FileReaderTest { + + @Test + public void test() throws IOException { + RandomAccessFile file = new RandomAccessFile("rainbow.txt", "rw"); + //FileChannel无法设置为非阻塞模式,它总是运行在阻塞模式下 + FileChannel fileChannel = file.getChannel();//NIO 中都从一个Channel 开始 + + //create buffer with capacity of 48 bytes + ByteBuffer buf = ByteBuffer.allocate(1000000000);//数据可以从Channel读到Buffer中,也可以从Buffer 写到Channel中 + //read into buffer + int bytesRead = fileChannel.read(buf); + + while (bytesRead != -1) { + + System.out.println("Read " + bytesRead); + //make buffer ready for read + buf.flip(); + + while (buf.hasRemaining()) { + System.out.print((char) buf.get()); + } + + //make buffer ready for writing + buf.clear(); + bytesRead = fileChannel.read(buf); + } + fileChannel.close(); + file.close(); + + ThreadUtils.sleep(Long.MAX_VALUE); + } + + +} diff --git a/net/src/test/java/com/zfoo/net/base/nio/filechannel/FileWriterTest.java b/net/src/test/java/com/zfoo/net/base/nio/filechannel/FileWriterTest.java new file mode 100644 index 00000000..81711bac --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/nio/filechannel/FileWriterTest.java @@ -0,0 +1,71 @@ +package com.zfoo.net.base.nio.filechannel; + +import com.zfoo.util.ThreadUtils; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +@Ignore +public class FileWriterTest { + + @Test + public void test() throws IOException { + writeToFile(); + ThreadUtils.sleep(Long.MAX_VALUE); + } + + + //fromFile-->toFile + public void transferFrom() throws IOException { + RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw"); + FileChannel fromChannel = fromFile.getChannel(); + + RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw"); + FileChannel toChannel = toFile.getChannel(); + + long position = 0; + long count = fromChannel.size(); + + toChannel.transferFrom(fromChannel, position, count); + } + + + //fromFile-->toFile + public void transferTo() throws IOException { + RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw"); + FileChannel fromChannel = fromFile.getChannel(); + + RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw"); + FileChannel toChannel = toFile.getChannel(); + + long position = 0; + long count = fromChannel.size(); + + fromChannel.transferTo(position, count, toChannel); + } + + public void writeToFile() throws IOException { + RandomAccessFile file = new RandomAccessFile("rainbow.txt", "rw"); + FileChannel fileChannel = file.getChannel(); + + String newData = "New String to write to file..." + System.currentTimeMillis(); + + ByteBuffer buf = ByteBuffer.allocate(48); + buf.clear(); + buf.put(newData.getBytes()); + + buf.flip(); + + while (buf.hasRemaining()) { + fileChannel.write(buf); + } + fileChannel.close(); + file.close(); + } + + +} diff --git a/net/src/test/java/com/zfoo/net/base/nio/nio/NioClientTest.java b/net/src/test/java/com/zfoo/net/base/nio/nio/NioClientTest.java new file mode 100644 index 00000000..6638df46 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/nio/nio/NioClientTest.java @@ -0,0 +1,135 @@ +package com.zfoo.net.base.nio.nio; + +import com.zfoo.util.ThreadUtils; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Iterator; + +public class NioClientTest implements Runnable { + + private String ip; + private int port; + + private Selector selector; + private SocketChannel channel; + private volatile boolean stop; + + public NioClientTest(String ip, int port) { + this.ip = ip; + this.port = port; + } + + public void init() { + try { + channel = SocketChannel.open(); + channel.configureBlocking(false); + channel.connect(new InetSocketAddress(ip, port)); + + this.selector = Selector.open(); + channel.register(selector, SelectionKey.OP_CONNECT); + } catch (IOException e) { + e.printStackTrace(); + System.out.println("非正常退出"); + System.exit(1); + } + } + + @Override + public void run() { + System.out.println("客户端启动成功!"); + while (!stop) { + //不会阻塞 + try { + selector.select(1001); + Iterator iterator = selector.selectedKeys().iterator(); + while (iterator.hasNext()) { + SelectionKey key = iterator.next(); + iterator.remove(); + try { + handleInput(key); + } catch (Exception e) { + if (key != null) { + key.cancel(); + if (key.channel() != null) { + key.channel().close(); + } + } + } + + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + if (selector != null) { + try { + selector.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + public void handleInput(SelectionKey key) throws IOException { + if (key.isValid()) { + //连接事件发生 + if (key.isConnectable()) { + SocketChannel channel = (SocketChannel) key.channel(); + //如果正在连接,则完成连接 + if (channel.finishConnect()) { + channel.finishConnect(); + } + channel.configureBlocking(false); + channel.write(ByteBuffer.wrap(new String("hello server!this is client!").getBytes())); + channel.register(selector, SelectionKey.OP_READ); + } + + if (key.isReadable()) { + SocketChannel channel = (SocketChannel) key.channel(); + ByteBuffer readBuffer = ByteBuffer.allocate(1024); + int readBytes = channel.read(readBuffer); + if (readBytes > 0) { + readBuffer.flip(); + byte[] data = new byte[readBuffer.remaining()]; + readBuffer.get(data); + String message = new String(data, "UTF-8").trim(); + System.out.println("客户端收到消息:" + message); + ByteBuffer writeBuffer = ByteBuffer.wrap(message.getBytes());//将消息返回给客户端 + writeBuffer.flip(); + channel.write(writeBuffer); + } else if (readBytes < 0) {//对链路关闭 + key.cancel(); + channel.close(); + } else { + //读到0字节忽略 + } + } + } + + if (key.isConnectable()) { + // a connection was established with a remote server. + + } + + if (key.isWritable()) { + // a channel is ready for writing + } + } + + @Ignore + @Test + public void clientTest() { + NioClientTest nioClient = new NioClientTest("localhost", 9999); + nioClient.init(); + new Thread(nioClient, "clinetThread").start(); + ThreadUtils.sleep(Long.MAX_VALUE); + } + +} diff --git a/net/src/test/java/com/zfoo/net/base/nio/nio/NioServerTest.java b/net/src/test/java/com/zfoo/net/base/nio/nio/NioServerTest.java new file mode 100644 index 00000000..c5ded586 --- /dev/null +++ b/net/src/test/java/com/zfoo/net/base/nio/nio/NioServerTest.java @@ -0,0 +1,141 @@ +package com.zfoo.net.base.nio.nio; + +import com.zfoo.util.ThreadUtils; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Date; +import java.util.Iterator; + +public class NioServerTest implements Runnable { + + //多路复用器,一对多 + private Selector selector; + + //1.获得一个ServerSocket通道,用于监听客户端的连接,它是所有客户端连接的父管道 + private ServerSocketChannel serverChannel; + + private volatile boolean stop; + + public void init(int port) { + try { + serverChannel = ServerSocketChannel.open(); + //2.绑定到port端口,设置通道为非阻塞 + //serverChannel.socket().bind(new InetSocketAddress(port)); + serverChannel.socket().bind(new InetSocketAddress("127.0.0.1", port)); + serverChannel.configureBlocking(false); + //3.创建多路复用器 + this.selector = Selector.open(); + //4.多路复用器和通道绑定,并注册绑定事件 + serverChannel.register(selector, SelectionKey.OP_ACCEPT); + } catch (IOException e) { + e.printStackTrace(); + System.out.println("非正常退出"); + System.exit(1); + } + } + + @Override + public void run() { + System.out.println("服务器启动成功!"); + //5.采用轮询的方式监听selector上是否有需要处理的事件(Key) + while (!stop) { + try { + //事件到达selector.open()会返回;否则一直阻塞 + selector.select(); + //获得selector中选中的项的迭代器,选中的项为注册的事件 + Iterator iterator = selector.selectedKeys().iterator(); + while (iterator.hasNext()) { + SelectionKey key = iterator.next(); + iterator.remove();//删除已选的key,防止重复处理 + try { + handleInput(key); + } catch (Exception e) { + if (key != null) { + key.cancel(); + if (key.channel() != null) { + key.channel().close(); + } + } + } + } + } catch (IOException e) { + e.printStackTrace(); + } + } + //多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源 + if (selector != null) { + try { + selector.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + public void handleInput(SelectionKey key) throws IOException { + if (key.isValid()) { + if (key.isAcceptable()) { //客户请求连接事件 + // 6.多路复用器监听到有新的客户端接入,处理新的接入请求,完成TCP三次握手,建立物理连接 + ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); + SocketChannel channel = serverChannel.accept(); + //7.设置服务端和客户端的连接为非阻塞模式,获得和客户端连接的通道 + channel.configureBlocking(false); + //给客户端发送消息 + channel.write(ByteBuffer.wrap(new String("Hello client!This is server!").getBytes())); + //8.将客户端链路注册到多路复用器上,和客户端连接成功后,为了接受到客户端的消息,需要给通道设置读权限 + channel.register(selector, SelectionKey.OP_READ); + } + + if (key.isConnectable()) { + // a connection was established with a remote server. + } + + if (key.isReadable()) { + //服务器可读取消息:得到事件发生的Socket通道 + SocketChannel channel = (SocketChannel) key.channel(); + //创建读取的缓冲区 + ByteBuffer readBuffer = ByteBuffer.allocate(1024); + int readBytes = channel.read(readBuffer); + if (readBytes > 0) { + //接受 + readBuffer.flip();//limit=position, position=0 + byte[] data = new byte[readBuffer.remaining()]; + readBuffer.get(data); + String message = new String(data).trim(); + System.out.println("服务器收到消息:" + message); + + //发送 + ByteBuffer writeBuffer = ByteBuffer.wrap(String.format("北京时间:" + new Date().toString()).getBytes());//将消息返回给客户端 + channel.write(writeBuffer); + } else if (readBytes < 0) {//对链路关闭 + key.cancel(); + channel.close(); + } else { + //读到0字节忽略 + } + } + + if (key.isWritable()) { + // a channel is ready for writing + } + } + } + + @Ignore + @Test + public void serverTest() { + NioServerTest nioServer = new NioServerTest(); + nioServer.init(9999); + new Thread(nioServer, "serverThread").start(); + ThreadUtils.sleep(Long.MAX_VALUE); + } + +}