Netty基础学习(二)NIO 基本知识
Netty官网:Netty: Home参考书籍:《Netty权威指南》Linux的内核将所有外部设备都看做一个文件来操作,对一个文件的读写操作会调用内核提供的系统命令,返回一个file descriptor(fd,文件描述符)。而对一个socket的读写也会有相应的描述符,称为socketfd(socket描述符),描述符就是一个数字,它指向内核中的一个结构体(文件路径,数据区等一些属性)。阻塞式
目录
Netty官网:Netty: Home
参考书籍:《Netty权威指南》
一 Linux网络IO模型
Linux的内核将所有外部设备都看做一个文件来操作,对一个文件的读写操作会调用内核提供的系统命令,返回一个file descriptor(fd,文件描述符)。
而对一个socket的读写也会有相应的描述符,称为socketfd(socket描述符),描述符就是一个数字,它指向内核中的一个结构体(文件路径,数据区等一些属性)。
1.1 阻塞式模型
阻塞式I/O是最简单的网络I/O模型,其特点是在执行I/O操作时,进程会被阻塞,直到I/O操作完成并返回结果。在这种模型下,进程不能同时处理多个连接或数据流,因为只有在当前I/O操作完成后,才能进行下一个I/O操作,阻塞式I/O适用于单线程、单连接的场景,实现简单,但存在I/O效率低、资源利用率低等问题。
1.2 非阻塞模型
在非阻塞式I/O模型中,进程在执行I/O操作时,不会被阻塞,而是立即返回,即使I/O操作未完成也会返回一个错误码。这样,进程可以同时处理多个连接或数据流,但需要不断地轮询I/O操作的状态,从而增加了CPU负担。非阻塞式I/O适用于单线程、多连接的场景,但需要在程序中不断地检查I/O状态,实现相对复杂。
1.3 多路复用模型
多路复用I/O模型利用了Linux提供的select/poll/epoll等机制,将多个I/O操作封装在一个函数调用中,等待任意一个I/O操作完成并返回。这种模型可以同时处理多个连接或数据流,而且不需要轮询I/O状态,因此CPU利用率更高,效率更高。多路复用I/O适用于多线程、多连接的场景,但需要维护一个I/O事件集合,实现相对复杂。
1.4 异步模型
异步I/O模型在发起I/O操作后,不需要进程等待操作完成,而是通过回调函数或信号等方式通知进程操作已完成。这种模型可以同时处理多个连接或数据流,而且不需要轮询I/O状态,因此CPU利用率更高,效率更高。异步I/O适用于高并发、多连接的场景,但需要实现相对复杂。
1.5 信号驱动模型
信号驱动I/O模型是一种I/O处理模型,通常用于Unix和Linux操作系统中。在这种模型中,应用程序不断地执行select()系统调用,以等待数据从一个或多个文件描述符中变得可用。
一旦文件描述符中的数据可用,select()调用将返回,并将控制权返回给应用程序。然后,应用程序可以使用标准的I/O系统调用(如read()和write())来读取或写入数据。
二 几种通信模型
2.1 BIO模型
采用BIO通信模型的服务端,通常由一个独立的Acceptor线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客户端,线程销毁。
🌈🌈模型结构
🌈🌈服务端
package com.shu.bio;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @description:
* TimeService根据传入的参数设置监听端口,如果没有入参,使用默认值8080,
* 通过构造函数创建ServerSocket,如果端口合法且没有被占用,服务端监听成功。
* @author: shu
* @createDate: 2023/4/24 10:22
* @version: 1.0
*/
public class TimeServer {
public static void main(String[] args) throws IOException {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.parseInt(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
ServerSocket server = null;
try{
server = new ServerSocket(port);
System.out.println("The time server is start in port : " + port);
Socket socket = null;
while (true) {
socket = server.accept();
new Thread(new TimeServerHandler(socket)).start();
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (server != null) {
System.out.println("The time server close");
server.close();
server = null;
}
}
}
}
package com.shu.bio;
import java.io.BufferedReader;
import java.io.PrintStream;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @description:
* @author: shu
* @createDate: 2023/4/24 10:26
* @version: 1.0
*/
public class TimeServerHandler implements Runnable{
private Socket socket;
public TimeServerHandler(Socket serverSocket) {
this.socket = serverSocket;
}
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run() {
// TODO Auto-generated method stub
BufferedReader in = null;
PrintStream out = null;
try {
in = new BufferedReader(new java.io.InputStreamReader(socket.getInputStream()));
out = new PrintStream(socket.getOutputStream());
String currentTime = null;
String body = null;
while (true) {
body = in.readLine();
if (body == null) {
break;
}
System.out.println("The time server receive order : " + body);
currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER";
out.println(currentTime);
}
} catch (Exception e) {
System.out.println("TimeServerHandler run error"+e.getMessage());
}
}
}
🌈🌈客户端
package com.shu.bio;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
/**
* @description: BIO客户端
* @author: shu
* @createDate: 2023/4/24 10:30
* @version: 1.0
*/
public class TimeClient {
public static void main(String[] args) {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.parseInt(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
socket = new Socket("127.0.0.1", port);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
out.println("QUERY TIME ORDER");
System.out.println("Send order 2 server succeed.");
String resp = in.readLine();
System.out.println("Now is : " + resp);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (in != null) {
try {
in.close();
} catch (Exception e2) {
e2.printStackTrace();
}
}
if (out != null) {
out.close();
out = null;
}
if (socket != null) {
try {
socket.close();
} catch (Exception e2) {
e2.printStackTrace();
}
socket = null;
}
}
}
}
🌈🌈测试
🌈🌈问题
该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈1:1的正比关系,由于线程是Java虚拟机非常宝贵的系统资源,当线程数膨胀之后,系统的性能将急剧下降,随着并发访问量的继续增大,系统会发生线程堆栈溢出、创建新线程失败等问题,并最终导致进程宕机或者僵死,不能对外提供服务,下面我们模拟100个客服端来测试?
package com.shu.bio;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
/**
* @description:
* @author: shu
* @createDate: 2023/4/24 10:34
* @version: 1.0
*/
public class TimeMoreClient {
public static void main(String[] args) {
// 模拟100个客户端
for (int i = 0; i < 100; i++) {
new Thread(new Runnable() {
@Override
public void run() {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.parseInt(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
socket = new Socket("", port);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
out.println(Thread.currentThread().getName()+" QUERY TIME ORDER");
System.out.println(Thread.currentThread().getName()+" Send order 2 server succeed.");
String resp = in.readLine();
System.out.println(Thread.currentThread().getName()+" Now is : " + resp);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (in != null) {
try {
in.close();
} catch (Exception e2) {
e2.printStackTrace();
}
}
if (out != null) {
out.close();
out = null;
}
if (socket != null) {
try {
socket.close();
} catch (Exception e2) {
e2.printStackTrace();
}
socket = null;
}
}
}
}
).start();
}
}
}
🌈🌈观察
🌈🌈结论
- BIO主要的问题在于每当有一个新的客户端请求接入时,服务端必须创建一个新的线程处理新接入的客户端链路,一个线程只能处理一个客户端连接。
- 在高性能服务器应用领域,往往需要面向成千上万个客户端的并发连接,这种模型显然无法满足高性能、高并发接入的场景。
- 注意:并不说他没有应用场景
2.2 伪异步IO模型
最初为了解决这种问题,我们利用线程池来达到解决问题的办法,但是这也是杯水车薪,下面我们来看看这种方法吧,线程池的知识自己去百度吧
当有新的客户端接入的时候,将客户端的Socket封装成一个Task(该任务实现java.lang.Runnable接口)投递到后端的线程池中进行处理,JDK的线程池维护一个消息队列和N个活跃线程对消息队列中的任务进行处理。
🌈🌈模型
🌈🌈服务端
package com.shu.aio;
import com.shu.bio.TimeServerHandler;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @description: 利用线程池改造TimeServer
* @author: shu
* @createDate: 2023/4/24 10:58
* @version: 1.0
*/
public class TimeServer {
public static void main(String[] args) throws IOException {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.parseInt(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
ServerSocket server = null;
try{
server = new ServerSocket(port);
System.out.println("The time server is start in port : " + port);
TimeServerHandlerExecutePool singleExecutor = new TimeServerHandlerExecutePool(50, 10000);
Socket socket = null;
while (true) {
socket = server.accept();
singleExecutor.execute(new TimeServerHandler(socket));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (server != null) {
System.out.println("The time server close");
server.close();
server = null;
}
}
}
}
package com.shu.aio;
import java.util.concurrent.*;
/**
* @description:
* @author: shu
* @createDate: 2023/4/24 10:59
* @version: 1.0
*/
public class TimeServerHandlerExecutePool {
/**
* 线程池
*/
private ExecutorService executor;
/**
* @param maxPoolSize 最大线程数
* @param queueSize 任务队列大小
*/
public TimeServerHandlerExecutePool(int maxPoolSize, int queueSize) {
// 这里自己来实现线程池
executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
maxPoolSize,
120L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueSize));
}
/**
* 执行任务
* @param task
*/
public void execute(Runnable task) {
executor.execute(task);
}
}
🌈🌈客户端
package com.shu.aio;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
/**
* @description:
* @author: shu
* @createDate: 2023/4/24 11:17
* @version: 1.0
*/
public class TimeClient {
public static void main(String[] args) {
/**
* 1.创建客户端Socket,指定服务器地址和端口
*/
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.parseInt(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
socket = new Socket("127.0.0.1", port);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
out.println("QUERY TIME ORDER");
System.out.println("Send order 2 server succeed.");
String resp = in.readLine();
System.out.println("Now is : " + resp);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (in != null) {
try {
in.close();
} catch (Exception e2) {
e2.printStackTrace();
}
}
if (out != null) {
out.close();
out = null;
}
if (socket != null) {
try {
socket.close();
} catch (Exception e2) {
e2.printStackTrace();
}
socket = null;
}
}
}
}
🌈🌈测试
🌈🌈结论
伪异步I/O通信框架采用了线程池实现,因此避免了为每个请求都创建一个独立线程造成的线程资源耗尽问题,但是由于它底层的通信依然采用同步阻塞模型,因此无法从根本上解决问题。
🌈🌈问题
- 服务端处理缓慢,返回应答消息耗费60s,平时只需要10ms。
- 采用伪异步I/O的线程正在读取故障服务节点的响应,由于读取输入流是阻塞的,因此,它将会被同步阻塞60s。
- 假如所有的可用线程都被故障服务器阻塞,那后续所有的I/O消息都将在队列中排队。
- 由于线程池采用阻塞队列实现,当队列积满之后,后续入队列的操作将被阻塞。
- 由于前端只有一个Accptor线程接收客户端接入,它被阻塞在线程池的同步阻塞队列之后,新的客户端请求消息将被拒绝,客户端会发生大量的连接超时。
2.3 AIO模型
NIO2.0的异步套接字通道是真正的异步非阻塞I/O,它对应UNIX网络编程中的事件驱动I/O(AIO),它不需要通过多路复用器(Selector)对注册的通道进行轮询操作即可实现异步读写,从而简化了NIO的编程模型。
🌈服务端
package com.shu.asyn;
/**
* @description:
* @author: shu
* @createDate: 2023/4/25 10:11
* @version: 1.0
*/
public class TimeServer {
public static void main(String[] args) {
int port = 8081;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port);
new Thread(timeServer, "AIO-AsyncTimeServerHandler-001").start();
}
}
package com.shu.asyn;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.CountDownLatch;
/**
* @description:
* @author: shu
* @createDate: 2023/4/25 10:11
* @version: 1.0
*/
public class AsyncTimeServerHandler implements Runnable{
private int port;
CountDownLatch latch;
AsynchronousServerSocketChannel asynchronousServerSocketChannel;
public AsyncTimeServerHandler(int port) {
this.port = port;
try{
asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
System.out.println("The time server is start in port : " + port);
}catch (Exception e){
e.printStackTrace();
}
}
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run() {
// TODO Auto-generated method stub
latch = new CountDownLatch(1);
doAccept();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void doAccept(){
asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler());
}
}
package com.shu.asyn;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
/**
* @description:
* @author: shu
* @createDate: 2023/4/25 10:18
* @version: 1.0
*/
public class AcceptCompletionHandler implements
CompletionHandler<AsynchronousSocketChannel, AsyncTimeServerHandler>
{
/**
* Invoked when an operation has completed.
*
* @param result The result of the I/O operation.
* @param attachment The object attached to the I/O operation when it was initiated.
*/
@Override
public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) {
attachment.asynchronousServerSocketChannel.accept(attachment, this);
ByteBuffer buffer = ByteBuffer.allocate(1024);
result.read(buffer, buffer, new ReadCompletionHandler(result));
}
/**
* Invoked when an operation fails.
*
* @param exc The exception to indicate why the I/O operation failed
* @param attachment The object attached to the I/O operation when it was initiated.
*/
@Override
public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
exc.printStackTrace();
attachment.latch.countDown();
}
}
🌈🌈客户端
package com.shu.asyn;
/**
* @description:
* @author: shu
* @createDate: 2023/4/25 10:28
* @version: 1.0
*/
public class TimeClient {
public static void main(String[] args) {
int port = 8081;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
}
}
new Thread(new AsyncTimeClientHandler("127.0.0.1", port), "AIO-AsyncTimeClientHandler-001").start();
}
}
package com.shu.asyn;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
/**
* @description:
* @author: shu
* @createDate: 2023/4/25 10:28
* @version: 1.0
*/
public class AsyncTimeClientHandler implements CompletionHandler<Void, AsyncTimeClientHandler>, Runnable {
private AsynchronousSocketChannel client;
private String host;
private int port;
private CountDownLatch latch;
public AsyncTimeClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
client = AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
latch = new CountDownLatch(1);
client.connect(new InetSocketAddress(host, port), this, this);
try {
latch.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void completed(Void result, AsyncTimeClientHandler attachment) {
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
client.write(writeBuffer, writeBuffer,
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
client.write(buffer, buffer, this);
} else {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
client.read(
readBuffer,
readBuffer,
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result,
ByteBuffer buffer) {
buffer.flip();
byte[] bytes = new byte[buffer
.remaining()];
buffer.get(bytes);
String body;
try {
body = new String(bytes,
"UTF-8");
System.out.println("Now is : "
+ body);
latch.countDown();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc,
ByteBuffer attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
// ingnore on close
}
}
});
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
// ingnore on close
}
}
});
}
@Override
public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
exc.printStackTrace();
try {
client.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
}
package com.shu.asyn;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
/**
* @description:
* @author: shu
* @createDate: 2023/4/25 10:19
* @version: 1.0
*/
public class ReadCompletionHandler implements
CompletionHandler<Integer, ByteBuffer>
{
private AsynchronousSocketChannel channel;
public ReadCompletionHandler(AsynchronousSocketChannel channel) {
if (this.channel == null)
this.channel = channel;
}
/**
* Invoked when an operation has completed.
*
* @param result The result of the I/O operation.
* @param attachment The object attached to the I/O operation when it was initiated.
*/
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
byte[] body = new byte[attachment.remaining()];
attachment.get(body);
try {
String req = new String(body, "UTF-8");
System.out.println("The time server receive order : " + req);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req) ? new java.util.Date(
System.currentTimeMillis()).toString() : "BAD ORDER";
doWrite(currentTime);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
private void doWrite(String currentTime) {
if (currentTime != null && currentTime.trim().length() > 0) {
byte[] bytes = (currentTime).getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer, writeBuffer,
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
// 如果没有发送完成,继续发送
if (buffer.hasRemaining())
channel.write(buffer, buffer, this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
// ingnore on close
}
}
});
}
}
/**
* Invoked when an operation fails.
*
* @param exc The exception to indicate why the I/O operation failed
* @param attachment The object attached to the I/O operation when it was initiated.
*/
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
this.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
🌈🌈测试
这种方式的开发难度比较大,我就不详细介绍了,有兴趣的话自己去百度
2.4 NIO模型
Java NIO(New IO)是从 Java 1.4 版本开始引入的一个新的 IO API,可以替代标准的 Java IO API。NIO 与原来的 IO 有同样的作用和目的,但是使用方式完全不同,NIO 支持面向缓冲区的、基于通道的 IO 操作。NIO 将以更加高效的方式进行文件的读写操作。
🌈🌈服务端
package com.shu.nio;
/**
* @description: NIO时间服务器
* @author: shu
* @createDate: 2023/4/24 14:38
* @version: 1.0
*/
public class TimeServer {
public static void main(String[] args) {
int port = 8081;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
}
}
package com.shu.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
/**
* @description: NIO时间服务器服务端
* @author: shu
* @createDate: 2023/4/24 14:40
* @version: 1.0
*/
public class MultiplexerTimeServer implements Runnable {
private Selector selector;
private ServerSocketChannel servChannel;
private volatile boolean stop;
/**
* 初始化多路复用器、绑定监听端口
*
* @param port
*/
public MultiplexerTimeServer(int port) {
try {
selector = Selector.open();
servChannel = ServerSocketChannel.open();
servChannel.configureBlocking(false);
servChannel.socket().bind(new InetSocketAddress(port), 1024);
servChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The time server is start in port : " + port);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
public void stop() {
this.stop = true;
}
@Override
public void run() {
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
// 处理新接入的请求消息
if (key.isAcceptable()) {
// Accept the new connection
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// Add the new connection to the selector
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
// Read the data
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("The time server receive order : "
+ body);
String currentTime = "QUERY TIME ORDER"
.equalsIgnoreCase(body) ? new java.util.Date(
System.currentTimeMillis()).toString()
: "BAD ORDER";
doWrite(sc, currentTime);
} else if (readBytes < 0) {
// 对端链路关闭
key.cancel();
sc.close();
} else
; // 读到0字节,忽略
}
}
}
private void doWrite(SocketChannel channel, String response)
throws IOException {
if (response != null && response.trim().length() > 0) {
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
}
}
🌈🌈客户端
package com.shu.nio;
/**
* @description: NIO时间客户端
* @author: shu
* @createDate: 2023/4/24 16:49
* @version: 1.0
*/
public class TimeClient {
public static void main(String[] args) {
int port = 8081;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001").start();
}
}
package com.shu.nio;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* @description:
* @author: shu
* @createDate: 2023/4/24 16:50
* @version: 1.0
*/
public class TimeClientHandle implements Runnable{
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public TimeClientHandle(String host, int port) {
this.host = host;
this.port = port;
try{
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
}
catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
@Override
public void run() {
try{
doConnect();
}catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
while (!stop) {
try{
selector.select(1000);
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
SelectionKey key = null;
while (iterator.hasNext()) {
key = iterator.next();
iterator.remove();
try{
handleInput(key);
}catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
}catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
if (selector != null) {
try{
selector.close();
}catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 处理服务响应
* @param key
* @throws Exception
*/
public void handleInput(SelectionKey key) throws Exception{
if (key.isValid()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
// 判断是否连接成功
if (key.isConnectable()) {
if (socketChannel.finishConnect()) {
System.out.println("连接成功");
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
}
else {
System.out.println("连接失败");
System.exit(1);
}
}
if (key.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = socketChannel.read(readBuffer);
if (readBytes > 0 ) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String (bytes, "UTF-8");
System.out.println("Now is: " + body);
this.stop = true;
}
else if (readBytes < 0) {
key.cancel();
socketChannel.close();
}
else {
;
}
}
}
}
/**
* 获取服务端响应
* @throws Exception
*/
public void doConnect() throws Exception{
if (socketChannel.connect(new InetSocketAddress(host, port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
}
else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}
/**
* 写数据给服务端
* @param socketChannel
* @throws Exception
*/
public void doWrite(SocketChannel socketChannel) throws Exception{
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
socketChannel.write(writeBuffer);
if (!writeBuffer.hasRemaining()) {
System.out.println("Send order 2 server succeed.");
}
}
}
🌈🌈测试
下面我们将来详解介绍NIO的基本知识,这也是为了学习Netty的基础,最后为几种模型做下比较
三 NIO基础知识
Java NIO系统的核心在于:通道(Channel)和缓冲区(Buffer),通道表示打开到 IO 设备(例如:文件、套接字)的连接,若需要使用 NIO 系统,需要获取用于连接 IO 设备的通道以及用于容纳数据的缓冲区,然后操作缓冲区,对数据进行处理,简而言之,通道负责传输,缓冲区负责存储
简单来理解,缓冲区(Buffer)是载体,通道(Channel)是方式,简而言之,Channel 负责传输,Buffer 负责存储
3.1 Buffer
缓冲区(Buffer):一个用于特定基本数据类型的容器,由 java.nio 包定义的,所有缓冲区都是 Buffer 抽象类的子类。
🌈🌈类型
缓冲区(Buffer):在 Java NIO 中负责数据的存取,缓冲区就是数组,用于存储不同类型的数据。根据数据类型的不同(boolean 除外),提供了相应类型的缓冲区:
ByteBuffer,CharBuffer,ShortBuffer,IntBuffer,LongBuffer,FloatBuffer
,DoubleBuffer,上述缓冲区管理方式几乎一致,都是通过 allocate() 来获取缓冲区
但是我们最常用的是ByteBuffer
🌈🌈存取方法
- put():存入数据到缓冲区中
- get():获取缓冲区中的数据
🌈🌈缓存区的核心属性
- capacity: 容量,表示缓冲区中最大存储数据的容量。一旦声明不能更改。
- limit: 界限,表示缓冲区中可以操作数据的大小。(limit 后的数据不能进行读写)
- position: 位置,表示缓冲区中正在操作数据的位置。
- mark: 标记,表示记录当前 position 的位置。可以通过 reset() 恢复到 mark 的位置。
- flip(): 调整模式,读写模式切换
0 <= mark <= position <= limit <= capacity
package com.shu.nio;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
/**
* @description:
* @author: shu
* @createDate: 2023/4/24 14:18
* @version: 1.0
*/
public class BufferTest {
public static void main(String[] args) {
// 创建一个Buffer,大小为5,即可以存放5个int
IntBuffer intBuffer = IntBuffer.allocate(5);
intBuffer.put(10);
intBuffer.put(11);
intBuffer.put(12);
intBuffer.put(13);
intBuffer.put(14);
// 抛出异常,因为已经没有空间了
// intBuffer.put(15);
// 将buffer转换,读写切换
// intBuffer.flip();
// System.out.println(intBuffer.get());
// System.out.println(intBuffer.get());
// System.out.println(intBuffer.get());
// System.out.println(intBuffer.get());
// System.out.println(intBuffer.get());
// // 抛出异常,因为已经没有数据了
// // System.out.println(intBuffer.get());
// rewind():可重复读
// intBuffer.rewind();
// System.out.println(intBuffer.get());
// System.out.println(intBuffer.get());
// System.out.println(intBuffer.get());
// System.out.println(intBuffer.get());
// System.out.println(intBuffer.get());
// 抛出异常,因为已经没有数据了
// System.out.println(intBuffer.get());
// clear():清空缓冲区,但是缓冲区的数据依然存在,处于“被遗忘”状态
intBuffer.clear();
System.out.println(intBuffer.get());
System.out.println(intBuffer.get());
System.out.println(intBuffer.get());
System.out.println(intBuffer.get());
System.out.println(intBuffer.get());
// 抛出异常,因为已经没有数据了
// System.out.println(intBuffer.get());
// mark():标记
// reset():恢复到mark的位置
// limit():界限,表示缓冲区中可以操作数据的大小。(limit后数据不能进行读写)
// capacity():容量,表示缓冲区中最大存储数据的容量。一旦声明不能改变。
// position():位置,表示缓冲区中正在操作数据的位置。
// 0 <= mark <= position <= limit <= capacity
ByteBuffer buffer = ByteBuffer.allocate(1024);
System.out.println("-------------allocate()-------------");
System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
buffer.put("abcde".getBytes());
System.out.println("-------------put()-------------");
System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
buffer.flip();
System.out.println("-------------flip()-------------");
System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
byte[] dst = new byte[buffer.limit()];
buffer.get(dst);
System.out.println("-------------get()-------------");
System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
System.out.println(new String(dst, 0, dst.length));
buffer.rewind();
System.out.println("-------------rewind()-------------");
System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
buffer.clear();
System.out.println("-------------clear()-------------");
System.out.println(buffer.position());
System.out.println(buffer.limit());
System.out.println(buffer.capacity());
System.out.println((char) buffer.get());
// 总结方法:
// allocate():分配一个指定大小的缓冲区
// put():存入数据到缓冲区中
// flip():切换读取数据模式
// get():获取缓冲区中的数据
// rewind():可重复读
// clear():清空缓冲区,但是缓冲区的数据依然存在,处于“被遗忘”状态
// mark():标记
// reset():恢复到mark的位置
// limit():界限,表示缓冲区中可以操作数据的大小。(limit后数据不能进行读写)
// capacity():容量,表示缓冲区中最大存储数据的容量。一旦声明不能改变。
// position():位置,表示缓冲区中正在操作数据的位置。
// 0 <= mark <= position <= limit <= capacity
}
}
🌈🌈直接缓冲区与非直接缓冲区
- 非直接缓冲区
通过 allocate() 方法分配缓冲区,将缓冲区建立在 JVM 的内存之中。
// 分配直接缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 判断是直接缓冲区还是非直接缓冲区
System.out.println(byteBuffer.isDirect());
- 直接缓冲区
通过 allocateDirect() 方法分配缓冲区,将缓冲区建立在物理内存之中。
// 分配直接缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
// 判断是直接缓冲区还是非直接缓冲区
System.out.println(byteBuffer.isDirect());
3.2 Channel
- 通道(channel):由 java.nio.channels 包定义的,Channel 表示 IO 源与目标打开的连接,Channel 类似于传统的流,只不过 Channel 本身不能直接访问数据,Channel 只能与 Buffer 进行交互。
- 通道用于源节点与目标节点的连接,在 Java NIO 中负责缓冲区中数据的传输,Channel 本身不存储数据,因此需要配合缓冲区进行传输。
🌈🌈类型
java.nio.channels.Channel 包下:
- 1.FileChannel 用于文件IO操作
- 2.DatagramChannel 用于UDP的IO操作
- 3.SocketChannel 用于TCP的传输操作
- 4.ServerSocketChannel 用于TCP连接监听操作
本地 IO:FileInputStream/FileOutputStream,RandomAccessFile
网络 IO: Socket,ServerSocket,DatagramSocket
🌈🌈案例
package com.shu.nio;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
* @description:
* @author: shu
* @createDate: 2023/4/24 14:31
* @version: 1.0
*/
public class ChannelTest {
public static void main(String[] args) throws IOException {
// Channel:用于源节点与目标节点的连接。
// 在Java NIO中负责缓冲区中数据的传输。Channel本身不存储数据,因此需要配合缓冲区进行传输。
// Channel的主要实现类:
// java.nio.channels.Channel接口:
// |--FileChannel
// |--SocketChannel
// |--ServerSocketChannel
// |--DatagramChannel
// 获取通道
// 1. Java针对支持通道的类提供了getChannel()方法
// 本地IO:
// FileInputStream/FileOutputStream
// RandomAccessFile
// 网络IO:
// Socket
// ServerSocket
// DatagramSocket
// 2. 在JDK1.7中的NIO.2针对各个通道提供了静态方法open()
// 3. 在JDK1.7中的NIO.2的Files工具类的newByteChannel()
// 分散(Scatter)与聚集(Gather)
// 分散读取(Scattering Reads):将通道中的数据分散到多个缓冲区中
// 聚集写入(Gathering Writes):将多个缓冲区中的数据聚集到通道中
// 字符集:Charset
// 编码:字符串 -> 字节数组
// 解码:字节数组 -> 字符串
System.out.println("-------------FileInputStream-------------");
FileInputStream fis = new FileInputStream("1.txt");
// 2. 获取通道
FileChannel channel = fis.getChannel();
// 3. 分配指定大小的缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
// 4. 将通道中的数据存入缓冲区中
channel.read(buf);
// 5. 切换读取数据的模式
buf.flip();
// 6. 将缓冲区中的数据写入通道中
channel.write(buf);
// 7. 关闭通道
channel.close();
}
}
3.3 Selector
Selector一般称为选择器,也可以翻译为多路复用器,是Java NIO核心组件之一,主要功能是用于检查一个或者多个NIO Channel(通道)的状态是否处于可读、可写。如此可以实现单线程管理多个Channel(通道),当然也可以管理多个网络连接。
🌈🌈IO事件
Java NIO将NIO事件进行了简化,只定义了四个事件,这四种事件用SelectionKey的四个常量来表示,我们在注册时只注册我们感兴趣的事件:
- SelectionKey.OP_CONNECT
- SelectionKey.OP_ACCEPT
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE
🌈🌈案例
package com.shu.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
/**
* @description: NIO时间服务器服务端
* @author: shu
* @createDate: 2023/4/24 14:40
* @version: 1.0
*/
public class MultiplexerTimeServer implements Runnable {
private Selector selector;
private ServerSocketChannel servChannel;
private volatile boolean stop;
/**
* 初始化多路复用器、绑定监听端口
*
* @param port
*/
public MultiplexerTimeServer(int port) {
try {
selector = Selector.open();
servChannel = ServerSocketChannel.open();
servChannel.configureBlocking(false);
servChannel.socket().bind(new InetSocketAddress(port), 1024);
servChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The time server is start in port : " + port);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
public void stop() {
this.stop = true;
}
@Override
public void run() {
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
// 处理新接入的请求消息
if (key.isAcceptable()) {
// Accept the new connection
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// Add the new connection to the selector
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
// Read the data
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("The time server receive order : "
+ body);
String currentTime = "QUERY TIME ORDER"
.equalsIgnoreCase(body) ? new java.util.Date(
System.currentTimeMillis()).toString()
: "BAD ORDER";
doWrite(sc, currentTime);
} else if (readBytes < 0) {
// 对端链路关闭
key.cancel();
sc.close();
} else
; // 读到0字节,忽略
}
}
}
private void doWrite(SocketChannel channel, String response)
throws IOException {
if (response != null && response.trim().length() > 0) {
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
}
}
🌈🌈理解
简单理解Selector不断的轮循事件Key,查询自己注册的事件,然后做对应的事情比如
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
// 处理新接入的请求消息
if (key.isAcceptable()) {
// Accept the new connection
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// Add the new connection to the selector
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
// Read the data
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("The time server receive order : "
+ body);
String currentTime = "QUERY TIME ORDER"
.equalsIgnoreCase(body) ? new java.util.Date(
System.currentTimeMillis()).toString()
: "BAD ORDER";
doWrite(sc, currentTime);
} else if (readBytes < 0) {
// 对端链路关闭
key.cancel();
sc.close();
} else
; // 读到0字节,忽略
}
}
}
更多推荐
所有评论(0)