创建阻塞的服务器
当 ServerSocketChannel
与 SockelChannel
采用默认的阻塞模式时,为了同时处理多个客户的连接,必须使用多线程
public class EchoServer { private int port = 8000; private ServerSocketChannel serverSocketChannel = null; private ExecutorService executorService; //线程池 private static final int POOL_MULTIPLE = 4; //线程池中工作线程的数目 public EchoServer() throws IOException { //创建一个线程池 executorService = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() * POOL_MULTIPLE); //创建一个ServerSocketChannel对象 serverSocketChannel = ServerSocketChannel.open(); //使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时,可以顺利绑定相同的端口 serverSocketChannel.socket().setReuseAddress(true); //把服务器进程与一个本地端口绑定 serverSocketChannel.socket().bind(new InetSocketAddress(port)); System.out.println("服务器启动"); } public void service() { while (true) { SocketChannel socketChannel = null; try { socketChannel = serverSocketChannel.accept(); //处理客户连接 executorService.execute(new Handler(socketChannel)); } catch(IOException e) { e.printStackTrace(); } } } public static void main(String args[])throws IOException { new EchoServer().service(); } //处理客户连按 class Handler implements Runnable { private SocketChannel socketChannel; public Handler(SocketChannel socketChannel) { this.socketChannel = socketChannel; } public void run() { handle(socketChannel); } public void handle(SocketChannel socketChannel) { try { //获得与socketChannel关联的Socket对象 Socket socket = socketChannel.socket(); System.out.println("接收到客户连接,来自:" + socket.getInetAddress() + ":" + socket.getPort()); BufferedReader br = getReader(socket); PrintWriter pw = getWriter(socket); String msg = null; while ((msg = br.readLine()) != null) { System.out.println(msg); pw.println(echo(msg)); if (msg.equals("bye")) { break; } } } catch (IOException e) { e.printStackTrace(); } finally { try { if(socketChannel != null) { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } } } private PrintWriter getWriter(Socket socket) throws IOException { OutputStream socketOut = socket.getOutputStream(); return new PrintWriter(socketOut,true); } private BufferedReader getReader(Socket socket) throws IOException { InputStream socketIn = socket.getInputStream(); return new BufferedReader(new InputStreamReader(socketIn)); } public String echo(String msg) { return "echo:" + msg; }}
创建非阻塞的服务器
在非阻塞模式下,EchoServer
只需要启动一个主线程,就能同时处理三件事:
- 接收客户的连接
- 接收客户发送的数据
- 向客户发回响应数据
EchoServer
委托 Selector
来负责监控接收连接就绪事件、读就绪事件和写就绪事件如果有特定事件发生,就处理该事件
// 创建一个Selector对象selector = Selector.open();//创建一个ServerSocketChannel对象serverSocketChannel = ServerSocketChannel.open();//使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时//可以顺利绑定到相同的端口serverSocketChannel.socket().setReuseAddress(true);//使ServerSocketChannel工作于非阻塞模式serverSocketChannel.configureBlocking(false)://把服务器进程与一个本地端口绑定serverSocketChannelsocket().bind(new InetSocketAddress(port));
EchoServer
类的 service()
方法负责处理本节开头所说的三件事,体现其主要流程的代码如下:
public void service() throws IOException { serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT); //第1层while循环 while(selector.select() > 0) { //获得Selector的selected-keys集合 Set readyKeys = selector.selectedKeys(); Iterator it = readyKeys.iterator(); //第2层while循环 while (it.hasNext()) { SelectionKey key = null; //处理SelectionKey try { //取出一个SelectionKey key = (SelectionKey) it.next(); //把 SelectionKey从Selector 的selected-key 集合中删除 it.remove(); 1f (key.isAcceptable()) { 处理接收连接就绪事件; } if (key.isReadable()) { 处理读就绪水件; } if (key.isWritable()) { 处理写就绪事件; } } catch(IOException e) { e.printStackTrace(); try { if(key != null) { //使这个SelectionKey失效 key.cancel(); //关闭与这个SelectionKey关联的SocketChannel key.channel().close(); } } catch(Exception ex) { e.printStackTrace(); } } } }}
- 首先由
ServerSocketChannel
向Selector
注册接收连接就绪事件,如果Selector
监控到该事件发生,就会把相应的SelectionKey
对象加入selected-keys
集合 - 第一层 while 循环,不断询问
Selector
已经发生的事件,select()
方法返回当前相关事件已经发生的SelectionKey
的个数,如果当前没有任何事件发生,该方法会阻塞下去,直到至少有一个事件发生。Selector
的selectedKeys()
方法返回selected-keys
集合,它存放了相关事件已经发生的SelectionKey
对象 - 第二层 while 循环,从
selected-keys
集合中依次取出每个SelectionKey
对象并从集合中删除,,然后调用isAcceptable()
、isReadable()
和isWritable()
方法判断到底是哪种事件发生了,从而做出相应的处理
1. 处理接收连接就绪事件
if (key.isAcceptable()) { //获得与SelectionKey关联的ServerSocketChannel ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); //获得与客户连接的SocketChannel SocketChannel socketChannel = (SocketChannel) ssc.accept(); //把Socketchannel设置为非阻塞模式 socketChannel.configureBlocking(false); //创建一个用于存放用户发送来的数据的级冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); //Socketchannel向Selector注册读就绪事件和写就绪事件 socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);}
2. 处理读就绪事件
public void receive(SelectionKey key) throws IOException { //获得与SelectionKey关联的附件 ByteBuffer buffer = (ByteBuffer) key.attachment(); //获得与SelectionKey关联的Socketchannel SocketChannel socketChannel = (SocketChannel)key.channel(); //创建一个ByteBuffer用于存放读到的数据 ByteBuffer readBuff = ByteBuffer.allocate(32); socketChannel.read(readBuff); readBuff.flip(); //把buffer的极限设为容量 buffer.limit(buffer.capacity()); //把readBuff中的内容拷贝到buffer buffer.put(readBuff);}
3. 处理写就绪事件
public void send(SelectionKey key) throws IOException { //获得与SelectionKey关联的ByteBuffer ByteBuffer buffer = (ByteBuffer) key.attachment(); //获得与SelectionKey关联的SocketChannel SocketChannel socketChannel = (SocketChannel) key.channel(); buffer.flip(); //按照GBK编码把buffer中的字节转换为字符串 String data = decode(buffer); //如果还没有读到一行数据就返回 if(data.indexOf("\r\n") == -1) return; //截取一行数据 String outputData = data.substring(0, data.indexOf("\n") + 1); //把输出的字符串按照GBK编码转换为字节,把它放在outputBuffer中 ByteBuffer outputBuffer = encode("echo:" + outputData); //输出outputBuffer的所有字节 while(outputBuffer,hasRemaining()) socketChannel.write(outputBuffer); //把outputData字符审按照GBK编码,转换为字节,把它放在ByteBuffer ByteBuffer temp = encode(outputData); //把buffer的位置设为temp的极限 buffer.position(temp.limit()): //删除buffer已经处理的数据 buffer.compact(); //如果已经输出了字符串“bye\r\n”,就使SelectionKey失效,并关闭SocketChannel if(outputData.equals("bye\r\n")) { key.cancel(); socketChannel.close(); }}
完整代码如下:
public class EchoServer { private int port = 8000; private ServerSocketChannel serverSocketChannel = null; private Selector selector; private Charset charset = Charset.forName("GBK");public EchoServer() throws IOException { // 创建一个Selector对象 selector = Selector.open(); //创建一个ServerSocketChannel对象 serverSocketChannel = ServerSocketChannel.open(); //使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时 //可以顺利绑定到相同的端口 serverSocketChannel.socket().setReuseAddress(true); //使ServerSocketChannel工作于非阻塞模式 serverSocketChannel.configureBlocking(false): //把服务器进程与一个本地端口绑定 serverSocketChannelsocket().bind(new InetSocketAddress(port)); } public void service() throws IOException { serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT); //第1层while循环 while(selector.select() > 0) { //获得Selector的selected-keys集合 Set readyKeys = selector.selectedKeys(); Iterator it = readyKeys.iterator(); //第2层while循环 while (it.hasNext()) { SelectionKey key = null; //处理SelectionKey try { //取出一个SelectionKey key = (SelectionKey) it.next(); //把 SelectionKey从Selector 的selected-key 集合中删除 it.remove(); 1f (key.isAcceptable()) { //获得与SelectionKey关联的ServerSocketChannel ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); //获得与客户连接的SocketChannel SocketChannel socketChannel = (SocketChannel) ssc.accept(); //把Socketchannel设置为非阻塞模式 socketChannel.configureBlocking(false); //创建一个用于存放用户发送来的数据的级冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); //Socketchannel向Selector注册读就绪事件和写就绪事件 socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer); } if (key.isReadable()) { receive(key); } if (key.isWritable()) { send(key); } } catch(IOException e) { e.printStackTrace(); try { if(key != null) { //使这个SelectionKey失效 key.cancel(); //关闭与这个SelectionKey关联的SocketChannel key.channel().close(); } } catch(Exception ex) { e.printStackTrace(); } } } } } public void receive(SelectionKey key) throws IOException { //获得与SelectionKey关联的附件 ByteBuffer buffer = (ByteBuffer) key.attachment(); //获得与SelectionKey关联的Socketchannel SocketChannel socketChannel = (SocketChannel)key.channel(); //创建一个ByteBuffer用于存放读到的数据 ByteBuffer readBuff = ByteBuffer.allocate(32); socketChannel.read(readBuff); readBuff.flip(); //把buffer的极限设为容量 buffer.limit(buffer.capacity()); //把readBuff中的内容拷贝到buffer buffer.put(readBuff); } public void send(SelectionKey key) throws IOException { //获得与SelectionKey关联的ByteBuffer ByteBuffer buffer = (ByteBuffer) key.attachment(); //获得与SelectionKey关联的SocketChannel SocketChannel socketChannel = (SocketChannel) key.channel(); buffer.flip(); //按照GBK编码把buffer中的字节转换为字符串 String data = decode(buffer); //如果还没有读到一行数据就返回 if(data.indexOf("\r\n") == -1) return; //截取一行数据 String outputData = data.substring(0, data.indexOf("\n") + 1); //把输出的字符串按照GBK编码转换为字节,把它放在outputBuffer中 ByteBuffer outputBuffer = encode("echo:" + outputData); //输出outputBuffer的所有字节 while(outputBuffer,hasRemaining()) socketChannel.write(outputBuffer); //把outputData字符审按照GBK编码,转换为字节,把它放在ByteBuffer ByteBuffer temp = encode(outputData); //把buffer的位置设为temp的极限 buffer.position(temp.limit()): //删除buffer已经处理的数据 buffer.compact(); //如果已经输出了字符串“bye\r\n”,就使SelectionKey失效,并关闭SocketChannel if(outputData.equals("bye\r\n")) { key.cancel(); socketChannel.close(); } } //解码 public String decode(ByteBuffer buffer) { CharBuffer charBuffer = charset.decode(buffer); return charBuffer.toStrinq(); } //编码 public ByteBuffer encode(String str) { return charset.encode(str); } public static void main(String args[])throws Exception { EchoServer server = new EchoServer(); server.service(); }}
阻塞模式与非阻塞模式混合使用
使用非阻塞模式时,ServerSocketChannel
以及 SocketChannel
都被设置为非阻塞模式,这使得接收连接、接收数据和发送数据的操作都采用非阻塞模式,EchoServer
采用一个线程同时完成这些操作
假如有许多客户请求连接,可以把接收客户连接的操作单独由一个线程完成,把接收数据和发送数据的操作由另一个线程完成,这可以提高服务器的并发性能
负责接收客户连接的线程按照阻塞模式工作,如果收到客户连接,就向 Selector
注册读就绪和写就绪事件,否则进入阻塞状态,直到接收到了客户的连接。负责接收数据和发送数据的线程按照非阻塞模式工作,只有在读就绪或写就绪事件发生时,才执行相应的接收数据和发送数据操作
public class EchoServer { private int port = 8000; private ServerSocketChannel serverSocketChannel = null; private Selector selector = null; private Charset charset = Charset.forName("GBK");public EchoServer() throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().setReuseAddress(true); serverSocketChannelsocket().bind(new InetSocketAddress(port)); } public void accept() { while(true) { try { SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(1024); synchronized(gate) { selector.wakeup(); socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer); } } catch(IOException e) { e.printStackTrace(); } } } private Object gate=new Object(); public void service() throws IOException { while(true) { synchronized(gate){} int n = selector.select(); if(n == 0) continue; Set readyKeys = selector.selectedKeys(); Iterator it = readyKeys.iterator(); while (it.hasNext()) { SelectionKey key = null; try { it.remove(); if (key.isReadable()) { receive(key); } if (key.isWritable()) { send(key); } } catch(IOException e) { e.printStackTrace(); try { if(key != null) { key.cancel(); key.channel().close(); } } catch(Exception ex) { e.printStackTrace(); } } } } } public void receive(SelectionKey key) throws IOException { ... } public void send(SelectionKey key) throws IOException { ... } public String decode(ByteBuffer buffer) { ... } public ByteBuffer encode(String str) { ... } public static void main(String args[])throws Exception { final EchoServer server = new EchoServer(); Thread accept = new Thread() { public void run() { server.accept(); } }; accept.start();server.service(); }}
注意一点:主线程的 selector select()
方法和 Accept 线程的 register(...)
方法都会造成阻塞,因为他们都会操作 Selector
对象的共享资源 all-keys
集合,这有可能会导致死锁
导致死锁的具体情形是:Selector
中尚没有任何注册的事件,即 all-keys
集合为空,主线程执行 selector.select()
方法时将进入阻塞状态,只有当 Accept 线程向 Selector
注册了事件,并且该事件发生后,主线程才会从 selector.select()
方法返回。然而,由于主线程正在 selector.select()
方法中阻塞,这使得 Acccept
线程也在 register()
方法中阻塞。Accept 线程无法向 Selector 注册事件,而主线程没有任何事件可以监控,所以这两个线程将永远阻塞下去
为了避免对共享资源的竞争,同步机制使得一个线程执行 register()
时,不允许另一个线程同时执行 select()
方法,反之亦然