perf[net]: 增加了一大波关于网络的测试用例

This commit is contained in:
jaysunxiao
2021-07-06 12:44:31 +08:00
parent ad01920ddf
commit 499e4f0bee
58 changed files with 2938 additions and 0 deletions
Binary file not shown.

After

Width:  |  Height:  |  Size: 196 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 255 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 189 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 281 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 424 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 224 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 126 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 115 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 108 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 116 KiB

+12
View File
@@ -0,0 +1,12 @@
阻塞和非阻塞是针对内核和上层应用来说的;同步和异步是针对应用和开发者来说的来说的。
![Image text](../image/netty/aio-nodejs-0.png)
![Image text](../image/netty/aio-nodejs-1.png)
![Image text](../image/netty/aio-nodejs-2.png)
![Image text](../image/netty/aio-nodejs-3.png)
![Image text](../image/netty/aio-netty-0.png)
![Image text](../image/netty/aio-netty-1.png)
![Image text](../image/netty/aio-netty-2.png)
![Image text](../image/netty/aio-netty-3.png)
![Image text](../image/netty/aio-netty-4.png)
![Image text](../image/netty/aio-netty-5.png)
+98
View File
@@ -0,0 +1,98 @@
# 一、netty模型
## 1.reactor线程驱动模型
```
bossGroup 线程池则只是在 Bind 某个端口后,获得其中一个线程作为 MainReactor
专门处理端口的 Accept 事件,每个端口对应一个 Boss 线程。
workerGroup 线程池会被各个 SubReactor 和 Worker 线程充分利用。
```
## 2.Netty事件模型
```
核心思路基于主从Reactor模型,一个NioEventLoopGroup包含n个NioEventLoop,
每一个NioEventLoop持有一个Selector和一个线程池(执行器EventExecute,其实是netty ForkJoinPool,并发度为n),
在选择器的NioEventLoop的run方法每次运行后,就会交给NioEventLoop中线程池的另外一个线程。
```
## 3.I/O复用模型
- Netty 的非阻塞 I/O 的实现关键是基于 I/O 复用模型,这里用 Selector 对象表示,基于Java的NIOnetty会有多个Selector
```
DK的NIO类库有一个epoll死循环bug,它会导致Selector空轮询,IO线程CPU达到100%,严重影响系统运行。
Netty的解决策略:
1.对Selector的select操作周期进行统计。
2.每完成一次空的select操作进行一次计数。
3.在某个周期内如果连续N次空轮询,则说明触发了JDK NIO的epoll死循环bug。
4.创建新的Selector,将出现bug的Selector上的channel重新注册到新的Selector上。
5.关闭bug的Selector,使用新的Selector进行替换。
```
# 二、netty设置
## 1.netty有哪些参数设置
1.TCP_NODELAY
解释:是否启用Nagle算法,改算法将小的碎片数据连接成更大的报文来最小化所发送的报文的数量。
使用建议:如果需要发送一些较小的报文,则需要禁用该算法,从而最小化报文传输延时。
只有在网络通信非常大时(通常指已经到100k+/秒了),设置为false会有些许优势,因此建议大部分情况下均应设置为true。
2.SO_REUSEADDR
解释:是否复用处于TIME_WAIT状态连接的端口,适用于有大量处于TIME_WAIT状态连接的场景,如高并发量的Http短连接场景等。
3.SO_SNDBUF
  解释:Socket参数,TCP数据发送缓冲区大小,即TCP发送滑动窗口,linux操作系统可使用命令:cat /proc/sys/net/ipv4/tcp_smem查询其大小。
缓冲区的大小决定了网络通信的吞吐量(网络吞吐量=缓冲区大小/网络时延)。
  使用建议:缓冲区大小设为网络吞吐量达到带宽上限时的值,即缓冲区大小=网络带宽*网络时延。以千兆网卡为例进行计算,假设网络时延为1ms,缓冲区大小=1000Mb/s * 1ms = 128KB。
4.SO_KEEPALIVE
  解释:是否使用TCP的心跳机制;
  使用建议:心跳机制由应用层自己实现;
## 2.netty最佳的Boss线程连接数
```
没有固定的连接数,一切还是看你的场景,连接数在满足传输吞吐量的情况下,越少越好。
2条连接时,只能有40k QPS。
48条连接,升到62k QPSCPU烧了28%
4条连接,QPS反而上升到68k ,而且CPU降到20%。
```
## 3.netty线程池怎么设置
```
Boss Group用于服务端处理建立连接的请求,WorkGroup用于处理I/O。
EventLoopGroup的默认大小都是是2倍的CPU核数,但这并不是一个恒定的最佳数量,为了避免线程上下文切换,只要能满足要求,这个值其实越少越好。
如果都是长连接,Boss Group平时很闲,好在它也只有忙起来才会多起线程,平时就只占1条。
Netty线程的数量一般固定且较少,所以很怕线程被堵塞,比如同步的数据库查询,
比如下游的服务调用(又来罗嗦,future.get()式的异步在执行future.get()时还是堵住当前线程的啊)。
所以,此时就要把处理放到一个业务线程池里操作,即使要付出线程上下文切换的代价
```
## 4.netty的JVM参数设置
```
-Dio.netty.leakDetectionLevel=disabled把检测关掉
堆外内存大小
```
## 5.netty注意事项
```
1. ctx.writeAndFlush() 与 channel.writeAndFlush()的区别在于,channel要经过整条Pipeline,而ctx直接找下一个outboundHandler。
2. channel.writeAndFlush(buf, channel.voidPromise() )
writeAndFlush不管你用不用默认构造返回一个Promise(Future),有点浪费内存。没有用的话,用一个公共的 voidPromise ,减少大家花费。不过低版本的Netty不能用。
3. 空闲连接管理,因为刚才说的ctx.writeAndFlush()不经Pipeline,所以只监控读空闲就够了。否则每次请求都要READ/WRITE/ALL IDEL三个值算一遍,白白消耗。
4. Handler能共用就标上Shareable Annotation然后共用,不要每个Channel建一个。
```
@@ -0,0 +1,57 @@
package com.zfoo.net.base.aio.client;
import com.zfoo.util.ThreadUtils;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.CountDownLatch;
public class AioClientTest implements Runnable {
private AsynchronousSocketChannel client;
private String host;
private int port;
private CountDownLatch latch;
public AioClientTest(String host, int port, int latchNum) {
this.host = host;
this.port = port;
this.latch = new CountDownLatch(latchNum);
}
public void init() {
try {
client = AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
ConnectCompletionHandler handler = new ConnectCompletionHandler(client, latch);
client.connect(new InetSocketAddress(host, port), handler, handler);
try {
latch.await();
client.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
@Ignore
@Test
public void clientTest() {
AioClientTest aioClient = new AioClientTest("127.0.0.1", 9999, 1);
aioClient.init();
new Thread(aioClient, "client").start();
ThreadUtils.sleep(Long.MAX_VALUE);
}
}
@@ -0,0 +1,55 @@
package com.zfoo.net.base.aio.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
public class ConnectCompletionHandler implements CompletionHandler<Void, ConnectCompletionHandler> {
private AsynchronousSocketChannel client;
private CountDownLatch latch;
public ConnectCompletionHandler(AsynchronousSocketChannel channel, CountDownLatch latch) {
this.client = channel;
this.latch = latch;
}
@Override
public void completed(Void result, ConnectCompletionHandler attachment) {
byte[] req = "QUERY TIME ORDER!".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
client.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
client.write(buffer, buffer, this);
} else {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
client.read(readBuffer, readBuffer, new ReadCompletionHandler(client, latch));
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
@Override
public void failed(Throwable exc, ConnectCompletionHandler attachment) {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
@@ -0,0 +1,46 @@
package com.zfoo.net.base.aio.client;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
/**
* Created by Administrator on 2017/5/19 0019.
*/
public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel client;
private CountDownLatch latch;
public ReadCompletionHandler(AsynchronousSocketChannel channel, CountDownLatch latch) {
this.client = channel;
this.latch = latch;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
byte[] bytes = new byte[attachment.remaining()];
attachment.get(bytes);
String body;
try {
body = new String(bytes, "UTF-8");
System.out.println("Now is :" + body);
latch.countDown();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
@@ -0,0 +1,20 @@
package com.zfoo.net.base.aio.server;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AioServerTest> {
@Override
public void completed(AsynchronousSocketChannel result, AioServerTest attachment) {
attachment.getAsynchronousServerSocketChannel().accept(attachment, this);
ByteBuffer buffer = ByteBuffer.allocate(1024);
result.read(buffer, buffer, new ReadCompletionHandler(result));
}
@Override
public void failed(Throwable exc, AioServerTest attachment) {
exc.printStackTrace();
attachment.getLatch().countDown();
}
}
@@ -0,0 +1,72 @@
package com.zfoo.net.base.aio.server;
import com.zfoo.util.ThreadUtils;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.CountDownLatch;
public class AioServerTest implements Runnable {
private int port;
private CountDownLatch latch;
private AsynchronousServerSocketChannel asynchronousServerSocketChannel;
public AioServerTest(int port, int latchNum) {
this.port = port;
latch = new CountDownLatch(latchNum);
}
public void init() {
try {
asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
System.out.println("The Time server is start in port:" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler());
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public AsynchronousServerSocketChannel getAsynchronousServerSocketChannel() {
return asynchronousServerSocketChannel;
}
public void setAsynchronousServerSocketChannel(AsynchronousServerSocketChannel asynchronousServerSocketChannel) {
this.asynchronousServerSocketChannel = asynchronousServerSocketChannel;
}
public CountDownLatch getLatch() {
return latch;
}
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
@Ignore
@Test
public void serverTest() {
AioServerTest aioServer = new AioServerTest(9999, 1);
aioServer.init();
new Thread(aioServer, "server").start();
ThreadUtils.sleep(Long.MAX_VALUE);
}
}
@@ -0,0 +1,74 @@
package com.zfoo.net.base.aio.server;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Date;
public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel channel;
public ReadCompletionHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
byte[] body = new byte[attachment.remaining()];
attachment.get(body);
String req = null;
try {
req = new String(body, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println("The time server receive order:" + req);
String currentTime = "Current time is " + new Date().toString() + "hello dddddddddd";
doWrite(currentTime);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
this.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public void doWrite(String currentTime) {
if (currentTime != null && currentTime.trim().length() > 0) {
byte[] bytes = currentTime.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
//如果没有发送完成,则继续发送
if (writeBuffer.hasRemaining()) {
channel.write(buffer);
}
}
@Override
public void failed(Throwable exc, ByteBuffer buffer) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}
}
@@ -0,0 +1,60 @@
package com.zfoo.net.base.bio;
import com.zfoo.protocol.util.IOUtils;
import org.junit.Ignore;
import org.junit.Test;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
@Ignore
public class TcpTest {
@Test
public void serverTest() throws IOException {
// 1.穿件服务器,指定端口
ServerSocket server = null;
BufferedWriter bufferedWriter = null;
try {
server = new ServerSocket(9999);
// 2.接受客户端连接,阻塞式
Socket socket = server.accept();
System.out.println("hello http!!!!!!");
// 3.发送数据+接受数据
String message = "welocme to internet!!!";
bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
bufferedWriter.write(message);
bufferedWriter.newLine();
bufferedWriter.flush();
} finally {
IOUtils.closeIO(bufferedWriter, server);
}
}
@Test
public void clientTest() throws IOException {
// 1.创建客户端,必须指定服务器+端口,此时就在连接
Socket client = null;
BufferedReader bufferedReader = null;
try {
// 同一个协议下TCP,端口不能重复,UDP和TCP可以公用一个端口
// 端口两个字节,1024一下为系统级别的端口,不能使用
client = new Socket("localhost", 9999);
// 2.接受数据
bufferedReader = new BufferedReader(new InputStreamReader(client.getInputStream()));
String receiveMes = bufferedReader.readLine();
System.out.println(receiveMes);
// 3.发送数据
client.getOutputStream().write(new String("This is client!").getBytes());
client.getOutputStream().flush();
} finally {
IOUtils.closeIO(client.getOutputStream(), client);
}
}
}
@@ -0,0 +1,60 @@
package com.zfoo.net.base.bio;
import com.zfoo.protocol.util.IOUtils;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
@Ignore
public class UdpTest {
@Test
public void serverTest() throws IOException {
// 1.创建服务端+端口
DatagramSocket server = null;
try {
server = new DatagramSocket(8000);
// 2.准备接收器
byte[] container = new byte[1024];
// 3.封装成包DatagramPacket
DatagramPacket packet = new DatagramPacket(container,
container.length);
// 4.接收数据,接收数据可以阻塞
server.receive(packet);
// 分析数据
byte[] data = packet.getData();
int length = packet.getLength();
System.out.println(new String(data, 0, length));
} finally {
// 5.释放资源
server.close();
}
}
@Test
public void clientTest() throws IOException {
// 1.创建客户端+端口
DatagramSocket client = null;
try {
client = new DatagramSocket(6666);
// 2.准备数据
String message = "udp 编程";
byte[] data = message.getBytes();
// 3.打包DatagramPacket,发送的地点及端口
DatagramPacket packet = new DatagramPacket(data, data.length,
new InetSocketAddress("localhost", 8888));
// 4.发送
client.send(packet);
// 5.释放资源
client.close();
} finally {
IOUtils.closeIO(client);
}
}
}
@@ -0,0 +1,90 @@
package com.zfoo.net.base.bio;
import com.zfoo.protocol.util.IOUtils;
import org.junit.Ignore;
import org.junit.Test;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
import java.net.UnknownHostException;
@Ignore
public class UrlTest {
// 不包含端口的ip地址
@Test
public void inetAddressTest() throws UnknownHostException {
// 获取本机的ip
InetAddress ipLocal = InetAddress.getLocalHost();
System.out.println("本机地址相关信息:");
System.out.println("本机计算机名称:" + ipLocal.getHostName());
System.out.println("本机ip" + ipLocal.getHostAddress());
System.out.println("**************************************************************************");
// 获取远程ip地址
InetAddress ipBaidu = InetAddress.getByName("www.baidu.com");
// 如果解析不到name和address都为ip
// InetAddress ipBaidu=InetAddress.getByName("115.239.211.112");
System.out.println("远程地址相关信息:");
System.out.println("远程计算机名称:" + ipBaidu.getHostName());
System.out.println("远程ip" + ipBaidu.getHostAddress());
}
// 包含端口的ip地址
@Test
public void inetSocketAddressToInetAddressTest() {
// localhost=127.0.0.1,本机ip
InetSocketAddress address = new InetSocketAddress("www.baidu.com", 9999);
System.out.println(address.getHostName());
System.out.println(address.getPort());
// InetSocketAddress和InetAddress的相互转换
InetAddress ip = address.getAddress();
System.out.println(ip.getHostName());// 计算机名称
System.out.println(ip.getHostAddress());// ip
}
@Test
public void urlTest() {
// URI统一资源标识符,用来唯一的标识一个资源
// URL统一资源定位符,一种具体的URI
// 四部分组成:协议 存放资源的主机名称 端口 资源文件名称
URL url = null;
// 读取网页的资源
InputStream inputStream = null;
InputStreamReader inputStreamReader = null;
BufferedReader bufferedReader = null;
try {
// 绝对路径
url = new URL("https://pic3.zhimg.com/v2-473b053a376f6752da5d5b06a31b7304_b.jpg");
System.out.println(url.getProtocol());
System.out.println(url.getHost());
System.out.println(url.getPort());
System.out.println(url.getFile());// 资源
System.out.println(url.getPath());// 相对路径
System.out.println(url.getRef());// 锚点
System.out.println(url.getQuery());// 存在锚点,参数为null;如果不存在锚点,返回正确
inputStream = url.openStream();
inputStreamReader = new InputStreamReader(inputStream, "utf-8");
bufferedReader = new BufferedReader(inputStreamReader);
String message;
while ((message = bufferedReader.readLine()) != null) {
System.out.println(message);// 输出到控制台
}
} catch (IOException e) {
e.printStackTrace();
} finally {
IOUtils.closeIO(bufferedReader);
}
}
}
@@ -0,0 +1,139 @@
package com.zfoo.net.base.bio.chat;
import com.zfoo.protocol.util.IOUtils;
import com.zfoo.util.ThreadUtils;
import org.junit.Ignore;
import org.junit.Test;
import java.io.*;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Scanner;
public class BioClientTest {
private String name;
private class SendThread implements Runnable {
// 控制台输入流
private BufferedReader console;
// 管道输出流
private DataOutputStream dataOut;
// 控制线程
private boolean isRunning;
private SendThread() {
console = new BufferedReader(new InputStreamReader(System.in));
isRunning = true;
}
public SendThread(Socket client) {
this();
try {
dataOut = new DataOutputStream(client.getOutputStream());
send(name);
} catch (IOException e) {
e.printStackTrace();
isRunning = false;
IOUtils.closeIO(console, dataOut);
}
}
private String getMessageFromConsole() {
try {
System.out.println("please input message:");
return console.readLine();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
public void send(String message) {
if (null != message) {
try {
dataOut.writeUTF(message);
dataOut.flush();
} catch (IOException e) {
isRunning = false;
IOUtils.closeIO(dataOut, console);
}
}
}
@Override
public void run() {
while (isRunning) {
send(getMessageFromConsole());
}
}
}
private static class ReceiveThread implements Runnable {
// 输入流
private DataInputStream dataIn;
// 线程标识
private boolean isRunning = true;
public ReceiveThread(Socket client) {
try {
dataIn = new DataInputStream(client.getInputStream());
} catch (IOException e) {
isRunning = false;
IOUtils.closeIO(dataIn);
}
}
private String receive() {
try {
return dataIn.readUTF();
} catch (IOException e) {
isRunning = false;
IOUtils.closeIO(dataIn);
}
return null;
}
@Override
public void run() {
while (isRunning) {
System.out.println(receive());
}
}
}
public void launchClient(String name) {
// 1.创建客户端,必须指定服务器+端口,此时就在连接
Socket client = null;
Thread sendThread = null;
Thread receiveThread = null;
this.name = name;
try {
client = new Socket("localhost", 9999);// 系统自动分配客户端的端口号
// 2.发送数据+接受数据
sendThread = new Thread(new SendThread(client));
receiveThread = new Thread(new ReceiveThread(client));
sendThread.start();
receiveThread.start();
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
@Ignore
@Test
public void clientTest() {
System.out.println("请输入名称:");
String name = new Scanner(System.in).nextLine();
new BioClientTest().launchClient(name);
ThreadUtils.sleep(Long.MAX_VALUE);
}
}
@@ -0,0 +1,131 @@
package com.zfoo.net.base.bio.chat;
import com.zfoo.protocol.util.IOUtils;
import com.zfoo.util.ThreadUtils;
import org.junit.Ignore;
import org.junit.Test;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
public class BioServerTest {
private List<ServerChannel> listClient = new ArrayList<ServerChannel>();
private class ServerChannel implements Runnable {
// 发送数据
private DataOutputStream dataOut;
// 接受数据
private DataInputStream dataIn;
private String name;
private boolean isRunning = true;
public ServerChannel(Socket client) {
try {
dataOut = new DataOutputStream(client.getOutputStream());
dataIn = new DataInputStream(client.getInputStream());
this.name = dataIn.readUTF();
System.out.println(name);
send("欢迎你进入聊天室!");
sendToAll(this.name + "进入聊天室!");
} catch (IOException e) {
quit();
}
}
private String receive() {
String message = null;
try {
message = dataIn.readUTF();
} catch (IOException e) {
quit();
}
return message;
}
private void send(String message) {
if (message == null || message.equals(""))
return;
try {
System.out.println(message);
dataOut.writeUTF(message);
dataOut.flush();
} catch (IOException e) {
quit();
}
}
private void sendToAll(String message) {
if (message.startsWith("@") && message.contains(":")) {
String name = message.substring(1, message.indexOf(":"));
String content = message.substring(message.indexOf(":") + 1);
for (ServerChannel other : listClient) {
if (other.name.equals(name)) {
other.send(this.name + "悄悄对你说:" + content);
}
}
} else {
for (ServerChannel other : listClient) {
if (other == this) {
continue;
}
other.send(message);
}
}
}
private void quit() {
IOUtils.closeIO(dataOut);
isRunning = false;
listClient.remove(this);
}
@Override
public void run() {
while (isRunning) {
sendToAll(receive());
}
}
}
public void launchServer() {
// 1.穿件服务器,指定端口
ServerSocket server = null;
try {
server = new ServerSocket(9999);
while (true) {
// 2.接受客户端连接,阻塞式
Socket client = server.accept();
ServerChannel serverChannel = new ServerChannel(client);
// 3.发送数据+接受数据
Thread clienThread = new Thread(serverChannel);
listClient.add(serverChannel);
clienThread.start();
System.out
.println("server started! Connect to client successfully");
}
} catch (IOException e) {
}
}
@Ignore
@Test
public void bioTest() {
new BioServerTest().launchServer();
ThreadUtils.sleep(Long.MAX_VALUE);
}
}
@@ -0,0 +1,42 @@
package com.zfoo.net.base.http;
import com.zfoo.util.ThreadUtils;
import org.junit.Ignore;
import org.junit.Test;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
@Ignore
public class AsyncHttpTest {
@Test
public void test() {
HttpClient client = HttpClient.newBuilder()
.executor(Executors.newSingleThreadExecutor())
.build();
HttpRequest request = HttpRequest
.newBuilder(URI.create("https://www.baidu.com"))
.GET()
.build();
HttpResponse.BodyHandler<String> responseBodyHandler = HttpResponse.BodyHandlers.ofString();
CompletableFuture<HttpResponse<String>> sendAsync = client.sendAsync(request, responseBodyHandler);
sendAsync.thenApply(t -> t.body()).thenAccept(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
});
ThreadUtils.sleep(Integer.MAX_VALUE);
}
}
@@ -0,0 +1,42 @@
package com.zfoo.net.base.netty.echoserver1;
import com.zfoo.protocol.util.StringUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
public static final String ECHO_REQUEST = "Welcome to netty!" + StringUtils.DOLLAR;
private int count;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf firstMessage;
byte[] req = ECHO_REQUEST.getBytes();
for (int i = 0; i < 100; i++) {
ctx.writeAndFlush(Unpooled.copiedBuffer(req));
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("This is " + count + " times receive sever :[" + body + "]");
count++;
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
@@ -0,0 +1,59 @@
package com.zfoo.net.base.netty.echoserver1;
import com.zfoo.protocol.util.StringUtils;
import com.zfoo.util.ThreadUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import org.junit.Ignore;
import org.junit.Test;
public class EchoClientTest {
public void connect(int port, String host) {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChildChannelHandler());
ChannelFuture future = null;
try {
future = bootstrap.connect(host, port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer(StringUtils.DOLLAR.getBytes());
channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
channel.pipeline().addLast(new StringDecoder());
channel.pipeline().addLast(new EchoClientHandler());
}
}
@Ignore
@Test
public void clientTest() {
new EchoClientTest().connect(9999, "127.0.0.1");
System.out.println("hello");
ThreadUtils.sleep(Long.MAX_VALUE);
}
}
@@ -0,0 +1,83 @@
package com.zfoo.net.base.netty.echoserver1;
import com.zfoo.protocol.util.StringUtils;
import com.zfoo.util.ThreadUtils;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.junit.Ignore;
import org.junit.Test;
/**
* 分隔符通信服务器设计
*
* @author SunInsanity
* @version 1.0
* @since 2017 05.22 18:23
*/
public class EchoServerTest {
private int port;
public EchoServerTest(int port) {
this.port = port;
}
public void init() {
//配置服务端nio线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();//服务端接受客户端连接
EventLoopGroup workerGroup = new NioEventLoopGroup();//SocketChannel的网络读写
try {
System.out.println("sdfs");
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChildChannelHandler());
//绑定端口,同步等待成功
ChannelFuture future = bootstrap.bind(port).sync();
//等待服务端监听端口关闭
future.channel().closeFuture().sync();
System.out.println("sdfs");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//优雅的退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer(StringUtils.DOLLAR.getBytes());
channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
channel.pipeline().addLast(new StringDecoder());
channel.pipeline().addLast(new EchoSeverHandler());
}
}
@Ignore
@Test
public void serverTest() {
System.out.println("hello");
EchoServerTest server = new EchoServerTest(9999);
server.init();
System.out.println("hello");
ThreadUtils.sleep(Long.MAX_VALUE);
}
}
@@ -0,0 +1,35 @@
package com.zfoo.net.base.netty.echoserver1;
import com.zfoo.protocol.util.StringUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class EchoSeverHandler extends ChannelInboundHandlerAdapter {
private int count;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("The is " + count + "times receive client : [" + body + "]");
count++;
body = body + StringUtils.DOLLAR;
ByteBuf writeBuffer = Unpooled.copiedBuffer(body.getBytes());
ctx.writeAndFlush(writeBuffer);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
@@ -0,0 +1,38 @@
package com.zfoo.net.base.netty.echoserver2;
import com.zfoo.protocol.util.StringUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
public static final String ECHO_REQUEST = "Welcome to netty!" + StringUtils.DOLLAR;
private int count;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf firstMessage;
byte[] req = ECHO_REQUEST.getBytes();
for (int i = 0; i < 100; i++) {
ctx.writeAndFlush(Unpooled.copiedBuffer(req));
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
@@ -0,0 +1,54 @@
package com.zfoo.net.base.netty.echoserver2;
import com.zfoo.util.ThreadUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import org.junit.Ignore;
import org.junit.Test;
public class EchoClientTest {
public void connect(int port, String host) {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChildChannelHandler());
ChannelFuture future = null;
try {
future = bootstrap.connect(host, port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new FixedLengthFrameDecoder(20));
channel.pipeline().addLast(new StringDecoder());
channel.pipeline().addLast(new EchoClientHandler());
}
}
@Ignore
@Test
public void clientTest() {
new EchoClientTest().connect(9999, "127.0.0.1");
System.out.println("hello");
ThreadUtils.sleep(Long.MAX_VALUE);
}
}
@@ -0,0 +1,76 @@
package com.zfoo.net.base.netty.echoserver2;
import com.zfoo.util.ThreadUtils;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.junit.Ignore;
import org.junit.Test;
/**
* 定长通信
*
* @author jaysunxiao
* @version 1.0
* @since 2017 05.22 18:23
*/
public class EchoServerTest {
private int port;
public EchoServerTest(int port) {
this.port = port;
}
public void init() {
//配置服务端nio线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();//服务端接受客户端连接
EventLoopGroup workerGroup = new NioEventLoopGroup();//SocketChannel的网络读写
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChildChannelHandler());
//绑定端口,同步等待成功
ChannelFuture future = bootstrap.bind(port).sync();
//等待服务端监听端口关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//优雅的退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new FixedLengthFrameDecoder(20));
channel.pipeline().addLast(new StringDecoder());
channel.pipeline().addLast(new EchoSeverHandler());
}
}
@Ignore
@Test
public void serverTest() {
System.out.println("hello");
EchoServerTest server = new EchoServerTest(9999);
server.init();
System.out.println("hello");
ThreadUtils.sleep(Long.MAX_VALUE);
}
}
@@ -0,0 +1,25 @@
package com.zfoo.net.base.netty.echoserver2;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class EchoSeverHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Server receive client : [" + msg + "]");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
@@ -0,0 +1,41 @@
package com.zfoo.net.base.netty.nettyfileserver;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* @author jaysunxiao
* @version 1.0
* @since 2017 05.31 10:22
*/
public class ClientHandler extends ChannelInboundHandlerAdapter {
private String filePath;
public ClientHandler(String filePath) {
this.filePath = filePath;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
byte[] req = new String(filePath + System.getProperty("line.separator")).getBytes();
ByteBuf message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println(body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
@@ -0,0 +1,73 @@
package com.zfoo.net.base.netty.nettyfileserver;
import com.zfoo.util.ThreadUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import org.junit.Ignore;
import org.junit.Test;
/**
* @author jaysunxiao
* @version 1.0
* @since 2017 05.31 10:22
*/
public class FileClientTest {
private int port;
private String host;
private String filePath;
public FileClientTest(int port, String host, String filePath) {
this.port = port;
this.host = host;
this.filePath = filePath;
}
public void connect() {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChildChannelHandler());
ChannelFuture future = null;
try {
future = bootstrap.connect(host, port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
channel.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));//三者组合起来就是文本换行编码解码器
channel.pipeline().addLast(new ClientHandler(filePath));
}
}
@Ignore
@Test
public void clientTest() {
FileClientTest client = new FileClientTest(9999, "127.0.0.1", "rainbow.txt");
client.connect();
System.out.println("hello");
ThreadUtils.sleep(Long.MAX_VALUE);
}
}
@@ -0,0 +1,56 @@
package com.zfoo.net.base.netty.nettyfileserver;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.FileRegion;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
/**
* @author jaysunxiao
* @version 1.0
* @since 2017 05.31 09:58
*/
public class FileServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("Server receive client : [" + msg + "]");
File file = new File((String) msg);
if (file.exists() && !file.isFile()) {
ctx.writeAndFlush("not a file:" + file + System.getProperty("line.separator"));
return;
}
ctx.write(file + " " + file.length());
RandomAccessFile randomAccessFile = null;
FileRegion region = null;
try {
randomAccessFile = new RandomAccessFile((String) msg, "r");
region = new DefaultFileRegion(randomAccessFile.getChannel(), 0, randomAccessFile.length());
ctx.write(region);
ctx.writeAndFlush(System.getProperty("line.separator"));
randomAccessFile.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
@@ -0,0 +1,79 @@
package com.zfoo.net.base.netty.nettyfileserver;
import com.zfoo.util.ThreadUtils;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.CharsetUtil;
import org.junit.Ignore;
import org.junit.Test;
/**
* 通过netty的通信,和java原生的RandomAccessFile来实现小文件的传输
*
* @author jaysunxiao
* @version 1.0
* @since 2017 05.31 09:49
*/
public class FileServerTest {
private int port;
public FileServerTest(int port) {
this.port = port;
}
public void init() {
//配置服务端nio线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();//服务端接受客户端连接
EventLoopGroup workerGroup = new NioEventLoopGroup();//SocketChannel的网络读写
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChildChannelHandler());
//绑定端口,同步等待成功
ChannelFuture future = bootstrap.bind(port).sync();
//等待服务端监听端口关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//优雅的退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
channel.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));//三者组合起来就是文本换行编码解码器
channel.pipeline().addLast(new FileServerHandler());
}
}
@Ignore
@Test
public void serverTest() {
System.out.println("hello");
FileServerTest server = new FileServerTest(9999);
server.init();
System.out.println("hello");
ThreadUtils.sleep(Long.MAX_VALUE);
}
}
@@ -0,0 +1,36 @@
package com.zfoo.net.base.netty.subscribe;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* @author jaysunxiao
* @version 1.0
* @since 2017 05.23 18:02
*/
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 10; i++) {
//*****一定要实现Serializable,不实现不会抛异常,很难查找bug
ctx.write(new SubscribeReq(i, "sun", "nettyBook", "1885630", "shanghai"));
}
ctx.flush();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
@@ -0,0 +1,32 @@
package com.zfoo.net.base.netty.subscribe;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* @author jaysunxiao
* @version 1.0
* @since 2017 05.23 17:13
*/
public class Serverhandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
SubscribeReq req = (SubscribeReq) msg;
System.out.println("Server accept client subscribe req: " + req.toString());
SubscribeResp resp = new SubscribeResp(req.getReqID(), 0, "subscribe successfully!");
//*****一定要实现Serializable,不实现不会抛异常,很难查找bug
ctx.writeAndFlush(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
@@ -0,0 +1,61 @@
package com.zfoo.net.base.netty.subscribe;
import com.zfoo.util.ThreadUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import org.junit.Ignore;
import org.junit.Test;
/**
* @author jaysunxiao
* @version 1.0
* @since 2017 05.23 18:02
*/
public class SubscribeClientTest {
public void connect(int port, String host) {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChildChannelHandler());
ChannelFuture future = null;
try {
future = bootstrap.connect(host, port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
group.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new ObjectDecoder(1024 * 1024
, ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
channel.pipeline().addLast(new ObjectEncoder());
channel.pipeline().addLast(new ClientHandler());
}
}
@Ignore
@Test
public void clientTest() {
new SubscribeClientTest().connect(9999, "127.0.0.1");
System.out.println("hello");
ThreadUtils.sleep(Long.MAX_VALUE);
}
}
@@ -0,0 +1,82 @@
package com.zfoo.net.base.netty.subscribe;
import java.io.Serializable;
/**
* @author jaysunxiao
* @version 1.0
* @since 2017 05.23 16:51
*/
public class SubscribeReq implements Serializable {
private static final long serialVersionUID = 1L;
private int reqID;
private String userName;
private String productName;
private String phoneNumber;
private String address;
public SubscribeReq(int reqID, String userName, String productName, String phoneNumber, String address) {
this.reqID = reqID;
this.userName = userName;
this.productName = productName;
this.phoneNumber = phoneNumber;
this.address = address;
}
public int getReqID() {
return reqID;
}
public void setReqID(int reqID) {
this.reqID = reqID;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public String getPhoneNumber() {
return phoneNumber;
}
public void setPhoneNumber(String phoneNumber) {
this.phoneNumber = phoneNumber;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
@Override
public String toString() {
return "SubscribeReq{" +
"reqID=" + reqID +
", userName='" + userName + '\'' +
", productName='" + productName + '\'' +
", phoneNumber='" + phoneNumber + '\'' +
", address='" + address + '\'' +
'}';
}
}
@@ -0,0 +1,58 @@
package com.zfoo.net.base.netty.subscribe;
import java.io.Serializable;
/**
* @author jaysunxiao
* @version 1.0
* @since 2017 05.23 16:52
*/
public class SubscribeResp implements Serializable {
private static final long serialVersionUID = 1L;
private int respID;
private int respCode;
private String desc;
public SubscribeResp(int respID, int respCode, String desc) {
this.respID = respID;
this.respCode = respCode;
this.desc = desc;
}
public int getRespID() {
return respID;
}
public void setRespID(int respID) {
this.respID = respID;
}
public int getRespCode() {
return respCode;
}
public void setRespCode(int respCode) {
this.respCode = respCode;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "SubscribeResp{" +
"respID=" + respID +
", respCode=" + respCode +
", desc='" + desc + '\'' +
'}';
}
}
@@ -0,0 +1,80 @@
package com.zfoo.net.base.netty.subscribe;
import com.zfoo.util.ThreadUtils;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.junit.Ignore;
import org.junit.Test;
/**
* 对象读写序列化通信
*
* @author jaysunxiao
* @version 1.0
* @since 2017 05.23 16:51
*/
public class SubscribeServerTest {
private int port;
public SubscribeServerTest(int port) {
this.port = port;
}
public void init() {
//配置服务端nio线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();//服务端接受客户端连接
EventLoopGroup workerGroup = new NioEventLoopGroup();//SocketChannel的网络读写
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChildChannelHandler());
//绑定端口,同步等待成功
ChannelFuture future = bootstrap.bind(port).sync();
//等待服务端监听端口关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//优雅的退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new ObjectDecoder(1024 * 1024
, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
//*****一定要实现Serializable,不实现不会抛异常,很难查找bug
channel.pipeline().addLast(new ObjectEncoder());
channel.pipeline().addLast(new Serverhandler());
}
}
@Ignore
@Test
public void serverTest() {
System.out.println("hello");
SubscribeServerTest server = new SubscribeServerTest(9999);
server.init();
System.out.println("hello");
ThreadUtils.sleep(Long.MAX_VALUE);
}
}
@@ -0,0 +1,39 @@
package com.zfoo.net.base.netty.timeserver1;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
public TimeClientHandler() {
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf firstMessage;
byte[] req = "QUERY TIME ORDER".getBytes();
for (int i = 0; i < 100; i++) {
firstMessage = Unpooled.buffer(req.length);
firstMessage.writeBytes(req);
ctx.writeAndFlush(firstMessage);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf readBuffer = (ByteBuf) msg;
byte[] read = new byte[readBuffer.readableBytes()];
readBuffer.readBytes(read);
String body = new String(read, "UTF-8");
System.out.println("Now is :" + body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
@@ -0,0 +1,49 @@
package com.zfoo.net.base.netty.timeserver1;
import com.zfoo.util.ThreadUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.junit.Ignore;
import org.junit.Test;
public class TimeClientTest {
public void connect(int port, String host) {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChildChannelHandler());
ChannelFuture future = null;
try {
future = bootstrap.connect(host, port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new TimeClientHandler());
}
}
@Ignore
@Test
public void clientTest() {
new TimeClientTest().connect(9999, "127.0.0.1");
ThreadUtils.sleep(Long.MAX_VALUE);
}
}
@@ -0,0 +1,36 @@
package com.zfoo.net.base.netty.timeserver1;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Date;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
private int count;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = (ByteBuf) msg;
byte[] req = new byte[buffer.readableBytes()];
buffer.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("The time server receive order :" + body + count++);
String currentTime = "QUERY TIME ORDER" + new Date().toString();
ByteBuf writeBuffer = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(writeBuffer);
System.out.println(count++);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
@@ -0,0 +1,66 @@
package com.zfoo.net.base.netty.timeserver1;
import com.zfoo.util.ThreadUtils;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.junit.Ignore;
import org.junit.Test;
/**
* 没有考虑到半包读写的服务器
*
* @author jaysunxiao
* @version 1.0
* @since 2017 05.22 18:23
*/
public class TimeServerTest {
private int port;
public TimeServerTest(int port) {
this.port = port;
}
public void init() {
//配置服务端nio线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();//服务端接受客户端连接
EventLoopGroup workerGroup = new NioEventLoopGroup();//SocketChannel的网络读写
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChildChannelHandler());
//绑定端口,同步等待成功
ChannelFuture future = bootstrap.bind(port).sync();
//等待服务端监听端口关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//优雅的退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new TimeServerHandler());
}
}
@Ignore
@Test
public void serverTest() {
TimeServerTest server = new TimeServerTest(9999);
server.init();
ThreadUtils.sleep(Long.MAX_VALUE);
}
}
@@ -0,0 +1,36 @@
package com.zfoo.net.base.netty.timeserver2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
public TimeClientHandler() {
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf firstMessage;
byte[] req = new String("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
for (int i = 0; i < 100; i++) {
firstMessage = Unpooled.buffer(req.length);
firstMessage.writeBytes(req);
ctx.writeAndFlush(firstMessage);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("Now is :" + body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
@@ -0,0 +1,54 @@
package com.zfoo.net.base.netty.timeserver2;
import com.zfoo.util.ThreadUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import org.junit.Ignore;
import org.junit.Test;
public class TimeClientTest {
public void connect(int port, String host) {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChildChannelHandler());
ChannelFuture future = null;
try {
future = bootstrap.connect(host, port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
channel.pipeline().addLast(new StringDecoder());
channel.pipeline().addLast(new TimeClientHandler());
}
}
@Ignore
@Test
public void clientTest() {
new TimeClientTest().connect(9999, "127.0.0.1");
System.out.println("hello");
ThreadUtils.sleep(Long.MAX_VALUE);
}
}
@@ -0,0 +1,33 @@
package com.zfoo.net.base.netty.timeserver2;
import com.zfoo.protocol.util.FileUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Date;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
private int count;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("The time server receive order :" + body + count++);
String currentTime = "QUERY TIME ORDER" + new Date().toString() + FileUtils.LS;
ByteBuf writeBuffer = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(writeBuffer);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
@@ -0,0 +1,72 @@
package com.zfoo.net.base.netty.timeserver2;
import com.zfoo.util.ThreadUtils;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import org.junit.Ignore;
import org.junit.Test;
/**
* 通过换行符解决半包读写的问题
*
* @author jaysunxiao
* @version 1.0
* @since 2017 05.22 18:23
*/
public class TimeServerTest {
private int port;
public TimeServerTest(int port) {
this.port = port;
}
public void init() {
//配置服务端nio线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();//服务端接受客户端连接
EventLoopGroup workerGroup = new NioEventLoopGroup();//SocketChannel的网络读写
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChildChannelHandler());
//绑定端口,同步等待成功
ChannelFuture future = bootstrap.bind(port).sync();
//等待服务端监听端口关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//优雅的退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
channel.pipeline().addLast(new StringDecoder());
channel.pipeline().addLast(new TimeServerHandler());
}
}
@Ignore
@Test
public void serverTest() {
System.out.println("hello");
TimeServerTest server = new TimeServerTest(9999);
server.init();
System.out.println("hello");
ThreadUtils.sleep(Long.MAX_VALUE);
}
}
@@ -0,0 +1,31 @@
package com.zfoo.net.base.netty.udpserver;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;
/**
* @author jaysunxiao
* @version 1.0
* @since 2017 05.27 17:48
*/
public class UDPClientHandler extends SimpleChannelInboundHandler<DatagramPacket> {
private static String DICTIONARY = "bbb";
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
String req = packet.content().toString(CharsetUtil.UTF_8);
System.out.println(req);
ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(DICTIONARY, CharsetUtil.UTF_8)
, packet.sender()));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
cause.printStackTrace();
}
}
@@ -0,0 +1,71 @@
package com.zfoo.net.base.netty.udpserver;
import com.zfoo.util.ThreadUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.util.CharsetUtil;
import org.junit.Ignore;
import org.junit.Test;
import java.net.InetSocketAddress;
/**
* @author jaysunxiao
* @version 1.0
* @since 2017 05.27 17:48
*/
public class UDPClientTest {
private int port;
public UDPClientTest(int port) {
this.port = port;
}
public void init() {
//配置服务端nio线程组
EventLoopGroup group = new NioEventLoopGroup();//服务端接受客户端连接
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true).handler(new UDPClientHandler());
//绑定端口,同步等待成功
ChannelFuture future = bootstrap.bind(0).sync();
//等待服务端监听端口关闭
Channel channel = future.channel();
//向内网的所有机器广播UDP消息
channel.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("client"
, CharsetUtil.UTF_8), new InetSocketAddress("127.0.0.1", port))).sync();
if (!channel.closeFuture().await(5000)) {
System.out.println("查询超时!");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//优雅的退出,释放线程池资源
group.shutdownGracefully();
}
}
@Ignore
@Test
public void clientTest() {
System.out.println("hello");
UDPClientTest server = new UDPClientTest(9999);
server.init();
System.out.println("hello");
ThreadUtils.sleep(Long.MAX_VALUE);
}
}
@@ -0,0 +1,33 @@
package com.zfoo.net.base.netty.udpserver;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;
/**
* @author jaysunxiao
* @version 1.0
* @since 2017 05.27 17:27
*/
public class UDPServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
private static String DICTIONARY = "aaa";
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
String req = packet.content().toString(CharsetUtil.UTF_8);
System.out.println(req);
if (req.equals("client")) {
ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(UDPServerHandler.DICTIONARY, CharsetUtil.UTF_8)
, packet.sender()));
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
cause.printStackTrace();
}
}
@@ -0,0 +1,58 @@
package com.zfoo.net.base.netty.udpserver;
import com.zfoo.util.ThreadUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import org.junit.Ignore;
import org.junit.Test;
/**
* UDP通信
*
* @author jaysunxiao
* @version 1.0
* @since 2017 05.27 17:24
*/
public class UDPServerTest {
private int port;
public UDPServerTest(int port) {
this.port = port;
}
public void init() {
//配置服务端nio线程组
EventLoopGroup group = new NioEventLoopGroup();//服务端接受客户端连接
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true).handler(new UDPServerHandler());
//绑定端口,同步等待成功
ChannelFuture future = bootstrap.bind(port).sync();
//等待服务端监听端口关闭
future.channel().closeFuture().await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//优雅的退出,释放线程池资源
group.shutdownGracefully();
}
}
@Ignore
@Test
public void serverTest() {
System.out.println("hello");
UDPServerTest server = new UDPServerTest(9999);
server.init();
System.out.println("hello");
ThreadUtils.sleep(Long.MAX_VALUE);
}
}
@@ -0,0 +1,47 @@
package com.zfoo.net.base.nio.filechannel;
import com.zfoo.util.ThreadUtils;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
@Ignore
public class FileReaderTest {
@Test
public void test() throws IOException {
RandomAccessFile file = new RandomAccessFile("rainbow.txt", "rw");
//FileChannel无法设置为非阻塞模式,它总是运行在阻塞模式下
FileChannel fileChannel = file.getChannel();//NIO 中都从一个Channel 开始
//create buffer with capacity of 48 bytes
ByteBuffer buf = ByteBuffer.allocate(1000000000);//数据可以从Channel读到Buffer中,也可以从Buffer 写到Channel中
//read into buffer
int bytesRead = fileChannel.read(buf);
while (bytesRead != -1) {
System.out.println("Read " + bytesRead);
//make buffer ready for read
buf.flip();
while (buf.hasRemaining()) {
System.out.print((char) buf.get());
}
//make buffer ready for writing
buf.clear();
bytesRead = fileChannel.read(buf);
}
fileChannel.close();
file.close();
ThreadUtils.sleep(Long.MAX_VALUE);
}
}
@@ -0,0 +1,71 @@
package com.zfoo.net.base.nio.filechannel;
import com.zfoo.util.ThreadUtils;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
@Ignore
public class FileWriterTest {
@Test
public void test() throws IOException {
writeToFile();
ThreadUtils.sleep(Long.MAX_VALUE);
}
//fromFile-->toFile
public void transferFrom() throws IOException {
RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");
FileChannel fromChannel = fromFile.getChannel();
RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");
FileChannel toChannel = toFile.getChannel();
long position = 0;
long count = fromChannel.size();
toChannel.transferFrom(fromChannel, position, count);
}
//fromFile-->toFile
public void transferTo() throws IOException {
RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");
FileChannel fromChannel = fromFile.getChannel();
RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");
FileChannel toChannel = toFile.getChannel();
long position = 0;
long count = fromChannel.size();
fromChannel.transferTo(position, count, toChannel);
}
public void writeToFile() throws IOException {
RandomAccessFile file = new RandomAccessFile("rainbow.txt", "rw");
FileChannel fileChannel = file.getChannel();
String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
while (buf.hasRemaining()) {
fileChannel.write(buf);
}
fileChannel.close();
file.close();
}
}
@@ -0,0 +1,135 @@
package com.zfoo.net.base.nio.nio;
import com.zfoo.util.ThreadUtils;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class NioClientTest implements Runnable {
private String ip;
private int port;
private Selector selector;
private SocketChannel channel;
private volatile boolean stop;
public NioClientTest(String ip, int port) {
this.ip = ip;
this.port = port;
}
public void init() {
try {
channel = SocketChannel.open();
channel.configureBlocking(false);
channel.connect(new InetSocketAddress(ip, port));
this.selector = Selector.open();
channel.register(selector, SelectionKey.OP_CONNECT);
} catch (IOException e) {
e.printStackTrace();
System.out.println("非正常退出");
System.exit(1);
}
}
@Override
public void run() {
System.out.println("客户端启动成功!");
while (!stop) {
//不会阻塞
try {
selector.select(1001);
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
//连接事件发生
if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) key.channel();
//如果正在连接,则完成连接
if (channel.finishConnect()) {
channel.finishConnect();
}
channel.configureBlocking(false);
channel.write(ByteBuffer.wrap(new String("hello server!this is client!").getBytes()));
channel.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = channel.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] data = new byte[readBuffer.remaining()];
readBuffer.get(data);
String message = new String(data, "UTF-8").trim();
System.out.println("客户端收到消息:" + message);
ByteBuffer writeBuffer = ByteBuffer.wrap(message.getBytes());//将消息返回给客户端
writeBuffer.flip();
channel.write(writeBuffer);
} else if (readBytes < 0) {//对链路关闭
key.cancel();
channel.close();
} else {
//读到0字节忽略
}
}
}
if (key.isConnectable()) {
// a connection was established with a remote server.
}
if (key.isWritable()) {
// a channel is ready for writing
}
}
@Ignore
@Test
public void clientTest() {
NioClientTest nioClient = new NioClientTest("localhost", 9999);
nioClient.init();
new Thread(nioClient, "clinetThread").start();
ThreadUtils.sleep(Long.MAX_VALUE);
}
}
@@ -0,0 +1,141 @@
package com.zfoo.net.base.nio.nio;
import com.zfoo.util.ThreadUtils;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
public class NioServerTest implements Runnable {
//多路复用器,一对多
private Selector selector;
//1.获得一个ServerSocket通道,用于监听客户端的连接,它是所有客户端连接的父管道
private ServerSocketChannel serverChannel;
private volatile boolean stop;
public void init(int port) {
try {
serverChannel = ServerSocketChannel.open();
//2.绑定到port端口,设置通道为非阻塞
//serverChannel.socket().bind(new InetSocketAddress(port));
serverChannel.socket().bind(new InetSocketAddress("127.0.0.1", port));
serverChannel.configureBlocking(false);
//3.创建多路复用器
this.selector = Selector.open();
//4.多路复用器和通道绑定,并注册绑定事件
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
System.out.println("非正常退出");
System.exit(1);
}
}
@Override
public void run() {
System.out.println("服务器启动成功!");
//5.采用轮询的方式监听selector上是否有需要处理的事件(Key)
while (!stop) {
try {
//事件到达selector.open()会返回;否则一直阻塞
selector.select();
//获得selector中选中的项的迭代器,选中的项为注册的事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();//删除已选的key,防止重复处理
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
//多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
if (key.isAcceptable()) { //客户请求连接事件
// 6.多路复用器监听到有新的客户端接入,处理新的接入请求,完成TCP三次握手,建立物理连接
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel channel = serverChannel.accept();
//7.设置服务端和客户端的连接为非阻塞模式,获得和客户端连接的通道
channel.configureBlocking(false);
//给客户端发送消息
channel.write(ByteBuffer.wrap(new String("Hello client!This is server!").getBytes()));
//8.将客户端链路注册到多路复用器上,和客户端连接成功后,为了接受到客户端的消息,需要给通道设置读权限
channel.register(selector, SelectionKey.OP_READ);
}
if (key.isConnectable()) {
// a connection was established with a remote server.
}
if (key.isReadable()) {
//服务器可读取消息:得到事件发生的Socket通道
SocketChannel channel = (SocketChannel) key.channel();
//创建读取的缓冲区
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = channel.read(readBuffer);
if (readBytes > 0) {
//接受
readBuffer.flip();//limit=position, position=0
byte[] data = new byte[readBuffer.remaining()];
readBuffer.get(data);
String message = new String(data).trim();
System.out.println("服务器收到消息:" + message);
//发送
ByteBuffer writeBuffer = ByteBuffer.wrap(String.format("北京时间:" + new Date().toString()).getBytes());//将消息返回给客户端
channel.write(writeBuffer);
} else if (readBytes < 0) {//对链路关闭
key.cancel();
channel.close();
} else {
//读到0字节忽略
}
}
if (key.isWritable()) {
// a channel is ready for writing
}
}
}
@Ignore
@Test
public void serverTest() {
NioServerTest nioServer = new NioServerTest();
nioServer.init(9999);
new Thread(nioServer, "serverThread").start();
ThreadUtils.sleep(Long.MAX_VALUE);
}
}