接上文 Java NIO使用(下)
SocketChannel是一个连接到TCP的通道,可以通过一个SocketChannel连接到服务端。
服务端
public class Server {public static void main(String[] args) {ServerSocket serverSocket = null;InputStream in = null;try {serverSocket = new ServerSocket(8080);int recvMsgSize = 0;byte[] recvBuf = new byte[1024];while (true) {Socket clntSocket = serverSocket.accept();SocketAddress clientAddress = clntSocket.getRemoteSocketAddress();System.out.println("Handling client at " + clientAddress);in = clntSocket.getInputStream();while ((recvMsgSize = in.read(recvBuf)) != -1) {byte[] temp = new byte[recvMsgSize];System.arraycopy(recvBuf, 0, temp, 0, recvMsgSize);System.out.println(new String(temp));}}} catch (IOException e) {e.printStackTrace();} finally {try {if (serverSocket != null)serverSocket.close();if (in != null)in.close();} catch (IOException e) {e.printStackTrace();}}}
}
客户端:
public class Client {public static void main(String[] args) {ByteBuffer buffer = ByteBuffer.allocate(1024);SocketChannel socketChannel = null;try {socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));if (socketChannel.finishConnect()) {int i = 0;while (true) {TimeUnit.SECONDS.sleep(1);String info = "I'm " + (i++) + "-th information from client";buffer.clear();buffer.put(info.getBytes());buffer.flip();while (buffer.hasRemaining()) {System.out.println(buffer);socketChannel.write(buffer);}}}} catch (Exception e) {e.printStackTrace();} finally {try {if (socketChannel != null) {socketChannel.close();}} catch (IOException e) {e.printStackTrace();}}}
}
SocketChannel的write()方法无法保证能写多少字节到SocketChannel。所以,我们重复调用write()直到Buffer没有要写的字节为止。非阻塞模式下,read()方法在尚未读取到任何数据时可能就返回了。所以需要关注它的int返回值,它会告诉你读取了多少字节。
ServerSocketChannel是一个可以用来监听新的TCP连接的通道,是服务端。 Selector(选择器)是Java
NIO中能够检测多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。仅用单个线程来处理多个Channels的好处是,只需要更少的新城来处理通道。事实上,可以只用一个线程处理所有的通道。
//打开网络通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//设置通道是非阻塞
serverSocketChannel.configureBlocking(false);
Selector selector = Selector.open();
//将指定的通道注册到选择其中并指定监听事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
与Selector一起使用时,Channel必须是非阻塞模式。(FileChannel不能设置成非阻塞,所以FileChannel不能和Selector一起使用)。
register()方法的第二个参数,在源码的注释中写道: The interest set for the resulting key
叫做感兴趣的集合。意识是指定Selector对Channel的哪些事件感兴趣。 事件类型有四种:
a) connect:客户端连接服务端的事件;在客户端注册
b) accept:服务端接收到客户端的连接
c) read:读取客户端发来的内容
d) write:向客户端写数据
客户端connect服务端,connect成功后,服务端开始准备accept。准备就绪后可以进行读写。
ServerSocketChannel中支持的类型:
public final int validOps() {return SelectionKey.OP_ACCEPT;
}
SocketChannel中支持的事件:
public final int validOps() {return (SelectionKey.OP_READ| SelectionKey.OP_WRITE| SelectionKey.OP_CONNECT);
}
当向Selector注册Channel时,register()方法会返回一个SelectionKey对象。SelectionKey用于跟踪被注册的事件。这个对象包含了一些你感兴趣的属性:
• interest集合
• ready集合
• Channel
• Selector
• 附加的对象(可选)
interest集合:就像向Selector注册通道一节中所描述的,interest集合是你所选择的感兴趣的事件集合(读写等事件)。可以通过SelectionKey读写interest集合。
ready 集合是通道已经准备就绪的操作的集合。在一次选择(Selection)之后,你会首先访问这个ready set。
int readySet = selectionKey.readyOps();
SelectionKey有四个方法,它们都会返回一个布尔类型:
selectionKey.isAcceptable();服务端接收到客户端的连接请求
selectionKey.isConnectable();客户端连接服务端成功
selectionKey.isReadable();//从通道读取
selectionKey.isWritable();向通道写入
从SelectionKey访问Channel和Selector很简单。如下:
Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
可以将一个对象或者更多信息附着到SelectionKey上,这样就能方便的识别某个给定的通道。例如,可以附加与通道一起使用的Buffer,或是包含聚集数据的某个对象。使用方法如下:
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment()
还可以在用register()方法向Selector注册Channel的时候附加对象。如:
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
Selector对象会包含3中类型的SelectionKey集合:
1)all-keys:当前所有向Selector注册的SelectionKey集合。Selectir的keys()方法返回的该集合。
public Set keys() {if (!this.isOpen() && !Util.atBugLevel("1.4")) {throw new ClosedSelectorException();} else {return this.publicKeys;}
}
2)selected-keys集合。
相关事件已经被Selector捕获的SelectionKey的集合,Selector的selectedKeys()方法返回该集合:
public Set selectedKeys() {if (!this.isOpen() && !Util.atBugLevel("1.4")) {throw new ClosedSelectorException();} else {return this.publicSelectedKeys;}
}
3)canceld-keys集合。已经被取消的SelectionKey的集合,Selector没有提供访问这种集合的方法当。当调用SelectionKey的cancel()方法时,key会加入到这个集合中:
private final Set cancelledKeys = new HashSet();void cancel(SelectionKey k) { // package-privatesynchronized (cancelledKeys) {cancelledKeys.add(k);}
}
当Selector的select()方法执行时,如果与SelectionKey相关的事件发生了,这个SelectionKey就被加入到selected-keys集合中,程序直接调用selected-keys集合的remove()方法,或者调用它的iterator的remove()方法,都可以从selected-keys集合中删除一个SelectionKey对象。Selector不会删除selectedKeys()中的成员,selectedKeys()中保存着上次轮询留下来的成员,但上一次的事件,这次轮询中可能没有准备好,所以会出错。需要从selectedKeys()中手动移除。从selectedKeys()中移除不会影响已经注册的通道,已经注册的通道是在keys()中。即selectedKeys()中是触发事件的集合。简单的说,程序中要循环执行Selector的select()方法,
//事件监听
private void handlerListen() {try {while (true) {selector.select();//获取事件,如果没有就阻塞Set keys = selector.selectedKeys();for (SelectionKey key : keys) {handler(key);//处理事件}keys.clear();//清空}} catch (IOException e) {e.printStackTrace();}
}
比如某个通道本次触发了READ事件,它会被放入selectedKeys()中,本次处理完成之后,Selector不会把它从selectedKeys()里移除;当程序进入下一次循环的时候,它还在selectedKeys()里,但本次它并不一定触发了事件。所以要我们手动移除。
使用NIO编写一个简单的聊天室:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;public class NioSocketServer {private String ip;private int port;private Map clients;//可以是ip+端口,value=通道private ByteBuffer readBuffer;private ByteBuffer writeBuffer;private Selector selector;public NioSocketServer() {this("localhost", 8111, 512);}public NioSocketServer(String ip, int port, int bufferLength) {this.ip = ip;this.port = port;this.readBuffer = ByteBuffer.allocate(bufferLength);this.writeBuffer = ByteBuffer.allocate(bufferLength);this.clients = new TreeMap<>();initServer();}//初始化private void initServer() {try {ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //打开网络通道serverSocketChannel.configureBlocking(false);//设置通道是非阻塞ServerSocket serverSocket = serverSocketChannel.socket();serverSocket.bind(new InetSocketAddress(ip, port));//绑定ip和端口selector = Selector.open();//将指定的通道注册到选择其中并指定监听事件serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("服务端已经启动=" + ip + ":" + port);handlerListen();} catch (IOException e) {e.printStackTrace();}}//事件监听private void handlerListen() {try {while (true) {selector.select();//获取事件,如果没有就阻塞Set keys = selector.selectedKeys();for (SelectionKey key : keys) {handler(key);//处理事件}keys.clear();//清空,否则下一次迭代中还会存在本次的SelectionKey}} catch (IOException e) {e.printStackTrace();}}//处理事件public void handler(SelectionKey key) {ServerSocketChannel server;SocketChannel socketChannel;try {if (key.isAcceptable()) {//有连接了server = (ServerSocketChannel) key.channel();socketChannel = server.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);//监听读取事件String clientKey = socketChannel.socket().getInetAddress().getHostAddress() + ":" + socketChannel.socket().getPort();if (!clients.containsKey(clientKey)) {clients.put(clientKey, socketChannel);}} else if (key.isReadable()) {//可以读取,客户端给服务端发消息了socketChannel = (SocketChannel) key.channel();readBuffer.clear();int len = socketChannel.read(readBuffer);readBuffer.flip();System.out.println("可读取:" + new String(readBuffer.array(), 0, len));if (len > 0) {String name = socketChannel.socket().getInetAddress().getHostAddress() + ":" + socketChannel.socket().getPort();for (String clientKey : clients.keySet()) {if (name.equals(clientKey)) {//不推送给自己continue;}//逐一发送,把消息发送给聊天室其它人SocketChannel channel = clients.get(clientKey);writeBuffer.clear();writeBuffer.put(name.getBytes());writeBuffer.put("说:".getBytes());writeBuffer.put(readBuffer);writeBuffer.flip();channel.write(writeBuffer);}}}} catch (IOException e) {e.printStackTrace();}}
}
客户端
package com.test.nio;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
import java.util.Set;public class NioSocketClient {private String serverIp;private int serverPort;private Selector selector;private SocketChannel socketChannel;private ByteBuffer readBuffer;private ByteBuffer writeBuffer;private String readMsg;private String writeMsg;public NioSocketClient() {this("localhost", 8111, 512);}public NioSocketClient(String serverIp, int serverPort, int bufferLength) {this.serverIp = serverIp;this.serverPort = serverPort;readBuffer = ByteBuffer.allocate(bufferLength);writeBuffer = ByteBuffer.allocate(bufferLength);init();listen();}private void init() {try {socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);selector = Selector.open();socketChannel.register(selector, SelectionKey.OP_CONNECT);socketChannel.connect(new InetSocketAddress(serverIp, serverPort));} catch (IOException e) {e.printStackTrace();}}private void listen() {try {while (true) {selector.select();//阻塞,获取事件Set keys = selector.selectedKeys();for (SelectionKey key : keys) {handler(key); //处理事件}keys.clear();}} catch (IOException e) {e.printStackTrace();}}private void handler(SelectionKey key) {SocketChannel channel;try {if (key.isConnectable()) {//连接上了channel = (SocketChannel) key.channel();if (channel.isConnectionPending()) {//验证连接是否通畅channel.finishConnect();//完成连接writeMsg = "连接服务端成功!";writeBuffer.clear();writeBuffer.put(writeMsg.getBytes());writeBuffer.flip();channel.write(writeBuffer);channel.register(selector, SelectionKey.OP_READ);write(channel);}} else if (key.isReadable()) {channel = (SocketChannel) key.channel();readBuffer.clear();int len = channel.read(readBuffer);if (len > 0) {readMsg = new String(readBuffer.array(), 0, len);System.out.println(readMsg);}}} catch (IOException e) {e.printStackTrace();}}private void write(SocketChannel channel) {new Thread(() -> {try {Scanner scanner = new Scanner(System.in);while (true) {System.out.println("请输入聊天内容");writeMsg = scanner.nextLine();writeBuffer.clear();writeBuffer.put(writeMsg.getBytes());writeBuffer.flip();channel.write(writeBuffer);}} catch (IOException e) {e.printStackTrace();}}).start();}
}
public static void reveive() {DatagramChannel channel = null;try {channel = DatagramChannel.open();channel.socket().bind(new InetSocketAddress(8888));ByteBuffer buf = ByteBuffer.allocate(1024);buf.clear();channel.receive(buf);buf.flip();while (buf.hasRemaining()) {System.out.print((char) buf.get());}System.out.println();} catch (IOException e) {e.printStackTrace();} finally {try {if (channel != null)channel.close();} catch (IOException e) {e.printStackTrace();}}}public static void send() {DatagramChannel channel = null;try {channel = DatagramChannel.open();String info = "I'm the Sender!";ByteBuffer buf = ByteBuffer.allocate(1024);buf.clear();buf.put(info.getBytes());buf.flip();int bytesSent = channel.send(buf, new InetSocketAddress("127.0.0.1", 8888));System.out.println(bytesSent);} catch (IOException e) {e.printStackTrace();} finally {try {if (channel != null)channel.close();} catch (IOException e) {e.printStackTrace();}}}