阻塞型IO,当用户进程发起read操作,一直阻塞直到数据拷贝到用户空间为止才返回。
/*** 1.打开命令行窗口: telnet localhost 8001* 2.进入发送消息模式: Ctrl + ]* 3.使用send命令发送消息: send hello*/
public class BIOServer {public static void main(String[] args) throws IOException {// ①启动服务端,绑定8001端口ServerSocket serverSocket = new ServerSocket(8001);System.out.println("server start");// ②while (true) {// ③开始接受客户端连接.如果没有新的客户端连接, 则会一直阻塞在这里Socket socket = serverSocket.accept();System.out.println("one client conn: " + socket);// ④启动一个线程处理连接new Thread(()->{try {InputStream inputStream = socket.getInputStream();while (true) {// 创建1KB缓冲区byte[] buffer = new byte[1024];// 如果没有数据返回, 就会一直阻塞当前线程. 读取到的数据会存储到buffer中// 返回读取到的字节个数. -1代表没有数据可读int readLength = inputStream.read(buffer);if (readLength < 0) {break;}byte[] readBytes = new byte[readLength];for (int i = 0; i < readLength; i++) {readBytes[i] = buffer[i];}// 将字节数据转换成字符串String msg = new String(readBytes,"UTF-8");System.out.println(socket + ": " + msg);if ("quit".equals(msg)) {break;}// 给客户端发送一个响应信息OutputStream outputStream = socket.getOutputStream();outputStream.write("ok".getBytes());outputStream.flush();}} catch (IOException e) {e.printStackTrace();} finally {System.out.println(socket + " close");try {socket.close();} catch (IOException e) {e.printStackTrace();}}}).start();}}
}
①首先启动一个服务端
ServerSocket serverSocket = new ServerSocket(8001);
②因为存在多个客户端连接,所以声明一个while循坏来等待客户端的链接。
③等待客户连接
Socket socket = serverSocket.accept();
接受客户端的连接,并把这个连接(Socket)保存下来,用于后续读取数据。
④分配一个线程来处理这个Socket
如果不新建线程,那么只能使用主线程来处理这个Socket,而主线程只能同时处理一个Socket,
不能同时处理多个客户端连接。
BIO编写网络程序很简单,但是每次来一个客户端都要分配一个线程,
如果客户端一直增加,服务端线程会无限增加,直到服务器资源耗尽。
NIO网络编程可以解决这个问题。
Java中的NIO使用的是IO多路复用技术实现的。
IO多路复用:将多个Socket注册到一个选择器(Selector),这个选择器会帮助我们去询问哪些Socket的数据准备好了。
相当于可以使用一个线程来管理多个Socket连接。
public class NIOServer {public static void main(String[] args) throws IOException {// 创建一个SelectorSelector selector = Selector.open();// 创建ServerSocketChannelServerSocketChannel serverSocketChannel = ServerSocketChannel.open();// 绑定8001端口serverSocketChannel.bind(new InetSocketAddress(8001));// 设置为非阻塞模式serverSocketChannel.configureBlocking(false);// 将Channel注册到Selector上,并注册Accept事件,Selector能为我们监听Accept事件serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("server start");while (true) {// 阻塞在select上(第一阶段阻塞)selector.select();// 如果使用的是select(timeout)或selectNow()需要判断返回值是否大于0// 有就绪的ChannelSet selectionKeys = selector.selectedKeys();// 遍历selectKeysIterator iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next();// 如果是accept事件if (selectionKey.isAcceptable()) {// 强制转换为 ServerSocketChannelServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();SocketChannel socketChannel = ssc.accept();System.out.println("accept new conn: " + socketChannel.getRemoteAddress());// 设置为非阻塞模式socketChannel.configureBlocking(false);// 将SocketChannel注册到Selector上,并注册读事件socketChannel.register(selector, SelectionKey.OP_READ);} else if (selectionKey.isReadable()) {// 如果是读取事件, 强制转换为SocketChannelSocketChannel socketChannel = (SocketChannel) selectionKey.channel();// 创建Buffer用于读取数据ByteBuffer buffer = ByteBuffer.allocate(1024);int length = 0;try {// 将数据读入到buffer中, 读取数据是阻塞的length = socketChannel.read(buffer);} catch (Exception e) {// 当客户端强制关闭连接, 也会有可读事件, 此时read操作会抛出异常System.out.println(socketChannel + " close");socketChannel.close();}if (length > 0) {buffer.flip();// 可读数据的长度int remaining = buffer.remaining();byte[] bytes = new byte[remaining];// 将数据读入到byte数组中buffer.get(bytes);// 将字节转换成字符串String msg = new String(bytes, "UTF-8");System.out.println("receive msg: " + msg);if ("quit".equals(msg)) {System.out.println(socketChannel + " close");socketChannel.close();}}}iterator.remove();}}}
}
①首先创建一个Selector
,相当于IO多路复用中的选择器。
②然后创建ServerSocketChannel
,并设置为非阻塞模式,是服务端进程。
③对于服务端来说,需要知道是否有新的连接进来,所以将ServerSocketChannel
注册到Selector
并监听ACCEPT事件。
④开始while死循环,让Selector
不断地去轮训。
Selector
调用select
方法会阻塞,直到有监听的事件发生才会返回。
⑤
select
方法返回后会拿到一系列SelectionKey
,
每个SelectionKey
里面都绑定了一个准备好数据的Channel
。
如果是ACCEPT事件,说明有新的客户端连接,Channel
为ServerSocketChannel
,
从ServerSocketChannel
中获取客户端连接SocketChannel
。
有客户端连接进来后,我们需要关心这个客户端连接是否有可读事件,所以需要将这个
SocketChannel
注册到Selector
并监听READ事件。
如果是READ事件,说明有可读数据,Channel
为SocketChannel
。
NIO编程只使用一个线程就可以处理所有的客户端连接,解决了BIO无限增加线程的问题。
在连接非常多的情况下,一次select
操作可能会返回很多个SelectionKey
,而取读取数据时需要把数据从内核缓冲区拷贝到用户空间,这是一个阻塞操作,读取完数据还需要做一些业务处理,处理完业务可能还会给客户端发送消息。
如果(读取数据+业务处理+发送消息)的操作都放在主线程里面处理,就会变得非常慢了。
可以把处理数据的部分扔到线程池中来处理。
/*** 发送socket的客户端* 这里采用的是BIO模式*/
public class ChatClient {public static void main(String[] args) throws IOException {Socket client = new Socket("localhost", 8001);OutputStream outputStream = client.getOutputStream();InputStream inputStream = client.getInputStream();// 开启这个线程用来监听接收数据new Thread(() -> {try {while (true) {byte[] bytes = new byte[1024];/*** 读取数据 如果没有数据返回 就会一直阻塞当前线程* 没有数据可读返回-1*/int readLength = inputStream.read(bytes);if (readLength < 0) {break;}String result = new String(getReadBytes(bytes, readLength),"UTF-8");System.out.println(result);}} catch (Exception e) {e.printStackTrace();}}).start();Scanner scanner = new Scanner(System.in);while (true) {String msg = scanner.nextLine();outputStream.write(msg.getBytes(StandardCharsets.UTF_8));outputStream.flush();if ("quit".equals(msg)) {client.close();break;}}}public static byte[] getReadBytes(byte[] bytes, int readLength) {byte[] result = new byte[readLength];for (int i = 0; i < readLength; i++) {result[i] = bytes[i];}return result;}
}
/*** 1.打开命令行窗口: telnet localhost 8001* 2.进入发送消息模式: Ctrl + ]* 3.使用send命令发送消息: send hello*/
public class BIOChatServer {public static Map LOGIN_USER_MAP = new ConcurrentHashMap<>();public static AtomicLong idCount = new AtomicLong(10000);public static void main(String[] args) throws IOException {// 启动服务端,绑定8001端口ServerSocket serverSocket = new ServerSocket(8001);System.out.println("server start");while (true) {// 开始接受客户端连接. 如果没有新的客户端连接 则会一直阻塞在这里Socket socket = serverSocket.accept();System.out.println("one client conn: " + socket);// 将当前的Socket连接保存下来joinChat(socket);// 启动线程处理连接数据new Thread(()->{try {InputStream inputStream = socket.getInputStream();while (true) {byte[] bytes = new byte[1024];/*** 读取数据 如果没有数据返回 就会一直阻塞当前线程* 没有数据可读返回-1*/int readLength = inputStream.read(bytes);if (readLength < 0) {break;}String result = new String(getReadBytes(bytes, readLength),"UTF-8");System.out.println(LOGIN_USER_MAP.get(socket) + "==>" + result);// 将信息发送给其它人sendAll(socket, result);if ("quit".equals(result)) {break;}}System.out.println("one client quit: " + socket);LOGIN_USER_MAP.remove(socket);socket.close();} catch (IOException e) {e.printStackTrace();LOGIN_USER_MAP.remove(socket);}}).start();}}private static void sendAll(Socket socket, String result) {Long userId = LOGIN_USER_MAP.get(socket);LOGIN_USER_MAP.forEach((key,value) -> {if (key != socket) {try {OutputStream outputStream = key.getOutputStream();outputStream.write((userId + ":" + result).getBytes(StandardCharsets.UTF_8));outputStream.flush();} catch (Exception e) {e.printStackTrace();}}});}private static void joinChat(Socket socket) {Long userId = idCount.getAndAdd(1);try {OutputStream outputStream = socket.getOutputStream();outputStream.write(("your userId is " + userId).getBytes(StandardCharsets.UTF_8));outputStream.flush();} catch (Exception e) {e.printStackTrace();}// 告诉其它登录用户有人加入了LOGIN_USER_MAP.forEach((key,value) -> {try {OutputStream outputStream = key.getOutputStream();outputStream.write((userId + " is online").getBytes(StandardCharsets.UTF_8));outputStream.flush();} catch (Exception e) {e.printStackTrace();}});LOGIN_USER_MAP.put(socket, userId);}public static byte[] getReadBytes(byte[] bytes, int readLength) {byte[] result = new byte[readLength];for (int i = 0; i < readLength; i++) {result[i] = bytes[i];}return result;}
}
public class NIOChatServer {public static void main(String[] args) throws IOException {Selector selector = Selector.open();ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(8001));serverSocketChannel.configureBlocking(false);// 将accept事件绑定到selector上serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("server start");while (true) {// 阻塞在select上selector.select();Set selectionKeys = selector.selectedKeys();// 遍历selectKeysIterator iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next();// 如果是accept事件if (selectionKey.isAcceptable()) {ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();SocketChannel socketChannel = ssc.accept();System.out.println("accept new conn: " + socketChannel.getRemoteAddress());socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);// 加入群聊ChatHolder.join(socketChannel);} else if (selectionKey.isReadable()) {// 如果是读取事件SocketChannel socketChannel = (SocketChannel) selectionKey.channel();ByteBuffer buffer = ByteBuffer.allocate(1024);// 将数据读入到buffer中int length = 0;try {length = socketChannel.read(buffer);} catch (IOException e) {// 因为客户端突然断开连接 也会有读取事件 此时读取就会有异常e.printStackTrace();// 退出群聊ChatHolder.quit(socketChannel);selectionKey.cancel();socketChannel.close();}if (length > 0) {buffer.flip();byte[] bytes = new byte[buffer.remaining()];// 将数据读入到byte数组中buffer.get(bytes);String content = new String(bytes, "UTF-8");System.out.println(ChatHolder.USER_MAP.get(socketChannel) + ":" + content);if (content.equalsIgnoreCase("quit")) {// 退出群聊ChatHolder.quit(socketChannel);selectionKey.cancel();socketChannel.close();} else {// 扩散ChatHolder.propagate(socketChannel, content);}}}iterator.remove();}}}private static class ChatHolder {private static final Map USER_MAP = new ConcurrentHashMap<>();public static AtomicLong idCount = new AtomicLong(10000);/*** 加入群聊*/public static void join(SocketChannel socketChannel) {// 有人加入就给他分配一个idLong userId = idCount.getAndAdd(1);send(socketChannel, "your userId is " + userId);for (SocketChannel channel : USER_MAP.keySet()) {send(channel, userId + " is online");}// 将当前用户加入到map中USER_MAP.put(socketChannel, userId);}/*** 退出群聊*/public static void quit(SocketChannel socketChannel) {Long userId = USER_MAP.get(socketChannel);USER_MAP.remove(socketChannel);for (SocketChannel channel : USER_MAP.keySet()) {send(channel, userId + " has quit the group chat");}}/*** 扩散说话的内容*/public static void propagate(SocketChannel socketChannel, String content) {Long userId = USER_MAP.get(socketChannel);for (SocketChannel channel : USER_MAP.keySet()) {if (channel != socketChannel) {send(channel, userId + ":" + content);}}}/*** 发送消息*/private static void send(SocketChannel socketChannel, String msg) {try {ByteBuffer writeBuffer = ByteBuffer.allocate(1024);writeBuffer.put(msg.getBytes());writeBuffer.flip();socketChannel.write(writeBuffer);} catch (Exception e) {e.printStackTrace();}}}
}