通信架构BIO、NIO、AIO
1、BIO、NIO、AIO介绍
(1)BIO
Java BIO也称同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销
(2)NIO
Java NIO 也称同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有 I/O 请求就进行处理
(3)AIO
Java AIO(也称NIO.2)也称 异步 异步非阻塞,服务器实现模式为一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理,一般适用于连接数较多且连接时间较长的应用
2、BIO、NIO、AIO适用场景
(1)BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序简单易理解。
(2)NIO 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。
编程比较复杂,JDK1.4 开始支持。
(3)AIO 方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用 OS 参与并发操作,
编程比较复杂,JDK7 开始支持。
3、BIO详讲
(1)BIO工作机制
(2)回顾
- 网络编程的基本模型是Client/Server模型,也就是两个进程之间进行相互通信,其中服务端提供位置信(绑定IP地址和端口),客户端通过连接操作向服务端监听的端口地址发起连接请求,基于TCP协议下进行三次握手连接,连接成功后,双方通过网络套接字(Socket)进行通信。
- 传统的同步阻塞模型开发中,服务端ServerSocket负责绑定IP地址,启动监听端口;客户端Socket负责发起连接操作。连接成功后,双方通过输入和输出流进行同步阻塞式通信。 基于BIO模式下的通信,客户端 – 服务端是完全同步,完全耦合的。
(3)案例讲解
以下案例中,只能实现客户端发送消息,服务端接收消息,并不能实现反复的收消息和反复的发消息,我们只需要在客户端案例中,加上反复按照行发送消息的逻辑即可!
没修改前(单发)
serverpublic class server {public static void main(String[] args) {try {System.out.println("============================serverstart=============================");//1、定义ServerSocket对象进行服务端口注册ServerSocket ss = new ServerSocket(9999);//2、监听客户端的Socket连接对象Socket socket = ss.accept();//3、从Socket管道中得到一个字节输入流对象InputStream is = socket.getInputStream();//4、把字节输入流包装成一个字符输入流//BufferedInputStream bis = new BufferedInputStream(is);BufferedReader br = new BufferedReader(newInputStreamReader(is));String msg;if((msg=br.readLine())!=null){System.out.println("服务端接收到"+msg);}} catch (Exception e) {e.printStackTrace();}}}clientpublic class client {public static void main(String[] args) {try {System.out.println("============================clientstart============================="); //1、创建Socket对象请求服务端连接 Socket socket = new Socket("127.0.0.1",9999);//2、从Socket对象获取一个字节输出流OutputStream os = socket.getOutputStream();//3、把字节输出流包装成一个打印流PrintStream ps = new PrintStream(os);ps.println("hello server");ps.flush();} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();}}}修改后(多发)serverpublic class server {public static void main(String[] args) {try {System.out.println("============================serverstart=============================");// 1、定义ServerSocket对象进行服务端口注册ServerSocket ss = new ServerSocket(9999);// 2、监听客户端的Socket连接对象Socket socket = ss.accept();// 3、从Socket管道中得到一个字节输入流对象InputStream is = socket.getInputStream();// 4、把字节输入流包装成一个字符输入流// BufferedInputStream bis = newBufferedInputStream(is);BufferedReader br = new BufferedReader(newInputStreamReader(is));String msg;while ((msg = br.readLine()) != null) {System.out.println("服务端接收到" + msg);}} catch (Exception e) {e.printStackTrace();}}}cleintpublic class client {public static void main(String[] args) {try {System.out.println("============================clientstart=============================");// 1、创建Socket对象请求服务端连接Socket socket = new Socket("127.0.0.1", 9999);// 2、从Socket对象获取一个字节输出流OutputStream os = socket.getOutputStream();// 3、把字节输出流包装成一个打印流PrintStream ps = new PrintStream(os);Scanner sc = new Scanner(System.in);while (true) {System.out.println("说:");String msg = sc.next();ps.println(msg);ps.flush();}} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();}}}
(4)Bio模式下接收多客户端
在上述的案例中,一个服务端只能接收一个客户端的通信请求,那么如果服务端需要处理很多个客户端的消息通信请求应该如何处理呢?,此时我们就需要在服务端引入线程了,也就是说客户端每发起一个请求,服务端就创建一个新的线程来处理这个客户端的请求,这样就实现了一个客户端一个线程的模型
实现多客户端连接BIO进行通信案例:
serverTreadpublic class serverThread extends Thread {private Socket socket;public serverThread(Socket socket) {this.socket = socket;}@Overridepublic void run() {try {//从socket对象中得到一个字节输入流InputStream is = socket.getInputStream();//使用缓存字符输入流包装字节输入流BufferedReader br= new BufferedReader(newInputStreamReader(is));String msg ;while((msg=br.readLine())!=null){System.out.println(msg);}} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}server/*** 服务端接收多个客户端 思路:服务端每接收一个客户端Socket请求对象之后都交给一个独立的线程处理客户端的数据交互需求** @author Style**/public class server {public static void main(String[] args) {System.out.println("============================serverstart=============================");try {//1、注册端口ServerSocket ss = new ServerSocket(9999);//2、定义一个死循环,负责不断的接收客户端的Socket连接请求while(true){Socket socket = ss.accept();//3、创建独立的线程来处理与这个客户端的Socket通信需求new serverThread(socket).start();}} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}cleintpublic class client {public static void main(String[] args) {System.out.println("============================clientstart=============================");try {// 1、创建Socket对象请求服务端连接Socket socket = new Socket("127.0.0.1", 9999);//2、得到一个打印流PrintStream ps = newPrintStream(socket.getOutputStream());//3、使用循环不带的发送消息给服务端接收Scanner sc = new Scanner(System.in);while(true){System.out.print("请说:");String msg =sc.next();ps.println(msg);ps.flush();}} catch (UnknownHostException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}
(5)总结:
- 每个Socket接收到,都会创建一个线程,线程的竞争、切换上下文影响性能;
- 每个线程都会占用栈空间和CPU资源;
- 并不是每个socket都进行IO操作,无意义的线程处理;
- 客户端的并发访问增加时。服务端将呈现1:1的线程开销,访问量越大,系统将发生线程栈溢出,线程创建失败,最终导致进程宕机或者僵死,从而不能对外提供服务。
4、伪异步I/O编程
(1)概述
在上述案例中:客户端的并发访问增加时。服务端将呈现1:1的线程开销,访问量越大,系统将发生线程栈溢出,线程创建失败,最终导致进程宕机或者僵死,从而不能对外提供服务。接下来我们采用一个伪异步I/O的通信框架,采用线程池和任务队列实现,当客户端接入时,将客户端的Socket封装成一个Task(该任务实现java.lang.Runnable线程任务接口)交给后端的线程池中进行处理。JDK的线程池维护一个消息队列和N个活跃的线程,对消息队列中Socket任务进行处理,由于线程池可以设置消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。
(2)使用伪异步IO实现案例
实际就是使用线程池控制线程个数
serverpublic class server {public static void main(String[] args) {System.out.println("============================serverstart=============================");try { //1、注册端口ServerSocket ss = new ServerSocket(9999); //2、定义一个循环接收客户端的Socket连接请求//初始化一个线程池pretentIOPool pool = new pretentIOPool(3, 10);while(true){Socket socket = ss.accept();//3、把socket对象交给一个线程池进行处理//把Socket封装成一个任务对象交给线程池处理Runnable target = new pretendThread(socket);pool.execute(target);}} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}pretendThreadpublic class pretendThread implements Runnable {private Socket socket;public pretendThread(Socket socket) {this.socket = socket;}@Overridepublic void run() {try {//从socket对象中得到一个字节输入流InputStream is = socket.getInputStream();//使用缓存字符输入流包装字节输入流BufferedReader br= new BufferedReader(newInputStreamReader(is));String msg ;while((msg=br.readLine())!=null){System.out.println(msg);}} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}pretendIOPoolpublic class pretentIOPool {// 1、创建一个线程池的成员变量用于存储一个线程池对象private ExecutorService executorservice;// 2、创建类的对象的时候需要初始化线程池对象public pretentIOPool(int maxTreadNum, int queueSize) {executorservice = new ThreadPoolExecutor(3, maxTreadNum,120, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(queueSize));}// 3、提供一个方法来提交任务给线程池的任务队列来暂存,等着线程池来处理public void execute(Runnable target) {executorservice.execute(target);}}cleintpublic class client {public static void main(String[] args) {System.out.println("============================clientstart=============================");try {// 1、创建Socket对象请求服务端连接Socket socket = new Socket("127.0.0.1", 9999);//2、得到一个打印流PrintStream ps = newPrintStream(socket.getOutputStream());//3、使用循环不带的发送消息给服务端接收Scanner sc = new Scanner(System.in);while(true){System.out.print("请说:");String msg =sc.next();ps.println(msg);ps.flush();}} catch (UnknownHostException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}
(3)总结
- 伪异步io采用了线程池实现,因此避免了为每个请求创建一个独立线程造成线程资源耗尽的问题,但由于底层依然是采用的同步阻塞模型,因此无法从根本上解决问题。
- 如果单个消息处理的缓慢,或者服务器线程池中的全部线程都被阻塞,那么后续socket的i/o消息都将在队列中排队。新的Socket请求将被拒绝,客户端会发生大量连接超时。
5、基于BIO形式下的文件上传
案例:实现文件上传
server/*** 接收客户端任意类型文件** @author Style**/public class server {public static void main(String[] args) {try{System.out.println("============================serverstart=============================");//1、定义ServerSocket对象进行服务端口注册ServerSocket ss = new ServerSocket(9999);while(true){Socket socket = ss.accept();//交给一个独立线程处理FileTread fileTread = new FileTread(socket);Thread tread =new Thread(fileTread);tread.start(); //new Thread(new FileTread(socket)).start();}}catch(Exception e){e.printStackTrace();}}}cleint/*** 实现客户端文件上传* @author Style**/public class client {public static void main(String[] args) { try{System.out.println("============================clientstart=============================");//1、请求与服务器的socket连接Socket socket = new Socket("127.0.0.1",9999);//2、将字节输出流包装成一个数据输出流DataOutputStream dos = newDataOutputStream(socket.getOutputStream());//获取文件类型String path ="C:\\Users\\Style\\Pictures\\SavedPictures\\cat.jpg";String suffix = getSuffix(path);//3、先发送上传文件的后缀给服务器dos.writeUTF(suffix);//4、把文件数据发送给服务端进行接收InputStream is = new FileInputStream(path);byte[] buffer = new byte[1024];int len;while((len=is.read(buffer))>0){dos.write(buffer,0,len);}dos.flush();socket.shutdownOutput();//通知服务端已经发送完成 }catch(Exception e){e.printStackTrace(); }}//获取文件类型public static String getSuffix(String path){File file = new File(path);String name = file.getName();int index = name.lastIndexOf(".");String suffix = name.substring(index,name.length());return suffix;}}FileThreadpublic class FileTread implements Runnable {private Socket socket;public FileTread(Socket socket) {this.socket = socket;}@Overridepublic void run() {// TODO Auto-generated method stubtry {// 1、得到一个数据输入流读取客户端发送过来的数据DataInputStream dis = newDataInputStream(socket.getInputStream());// 2、读取客户端发送过来的文件类型String suffix = dis.readUTF();System.out.println("服务端已经接受到文件");// 3、定义一个字节输出管道负责客户端文件数据写出String outPath = "E:\\网络编程BIO NIO AIO\\讲义\\test\\" + UUID.randomUUID().toString() + suffix;OutputStream os = new FileOutputStream(outPath);// 4、从数据输入流读取文件数据,写出到字节输出流去byte[] buffer = new byte[1024];int len;while ((len = dis.read(buffer)) > 0) {os.write(buffer, 0, len);}os.close();System.out.println("上传文件成功");} catch (Exception e) {e.printStackTrace();}}}
6、BIO模式下的端口转发思想
需求:需要实现一个客户端的消息可以发送给所有的客户端去接收。(群聊实现)
实现服务端转发案例实现
serverpublic class server {// 定义一个静态集合public static List<Socket> allSocket=new ArrayList<Socket>();public static void main(String[] args) {try {System.out.println("============================server start=============================");ServerSocket ss = new ServerSocket(9999);while (true) {Socket socket = ss.accept();//把登录的客户端socket存入在线集合中allSocket.add(socket);//为当前登录的socket分配一个独立的线程处理与之通信new Thread(new serverThread(socket)).start();}} catch (Exception e) {e.printStackTrace();}}}serverThreadpublic class serverThread implements Runnable {private Socket socket;public serverThread(Socket socket) {this.socket = socket;}public void run() {try {//1.从socket中获取当前客户端的输入流BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));String msg;while ((msg = br.readLine()) != null) {//2、服务端的接收到客户的消息之后,是需要推送给当前所有的在线socketsendMsgToAllClient(msg);}} catch (Exception e) {//当前有人下线,需要移除集合中下线的socketserver.allSocket.remove(socket);e.printStackTrace();//e.printStackTrace();}}/** * 把当前客户端发来的消息推送给全部在线的socket * * @param msg */private void sendMsgToAllClient(String msg) {for (Socket sk : server.allSocket) {try {PrintStream ps = new PrintStream(sk.getOutputStream());ps.println(msg);ps.flush();} catch (IOException e) {e.printStackTrace();}}}}
7、BIO实现聊天室私聊、群聊
思路:每当客户端发送消息给服务端,客户端则会先查看是否的单发还是多发,然后开始发送,随之服务端会接收客户端的消息与操作码,若是群发,则会转发给所有客户端,若是单发,则会遍厉在线集合,找到对应的Socket,写入当前操作码和想要发送的信息,然后转发给对应的客户端,而客户端接收这边则会通过判断接收的msg【错误提示信息】是否为“”,若不为“”,则带边是属于登录客户端,直接打开窗口就行,若为“”,则代表是接收信息,此时就要单独为Client开启一个线程去处理接收到的信息,具体步骤就是,获取当前的socket,读取输入流对象,获取传过来的参数,最后展示到界面,
serverChatpublic class serverChat {/** * 定义一个集合存储Socket * 在线集合只需要一个:存储客户端socket的同时还需要存储需要知道的socket客户端名称 * key:value ====socket:客户端名称 */public static Map<Socket,String> onLineSocket = new HashMap<>();public static void main(String[] args) { try { System.out.println("---------------server start---------------------"); ServerSocket ss = new ServerSocket(stringConfig.PORT); //循环一致等待所有可能的客户端连接 while (true){ //循环监听 Socket socket = ss.accept(); //把客户端socket管道单独配置一个线程来处理new Thread(new serverReader(socket)).start(); } }catch (Exception e){e.printStackTrace(); }}}serverReaderpublic class serverReader implements Runnable {private Socket socket;public serverReader(Socket socket) {this.socket = socket;}@Overridepublic void run() {DataInputStream dis = null;try {dis = new DataInputStream(socket.getInputStream());/** 1.循环一直等待客户端的消息 */while(true){/** 2.读取当前的消息类型 :登录,群发,私聊 , @消息 */int flag = dis.readInt();if(flag == 1){/** 先将当前登录的客户端socket存到在线人数的socket集合中 */String name = dis.readUTF() ;System.out.println(name+"-->"+socket.getRemoteSocketAddress());serverChat.onLineSocket.put(socket, name);}writeMsg(flag,dis);}} catch (Exception e) {System.out.println("--有人下线了--");// 从在线人数中将当前socket移出去serverChat.onLineSocket.remove(socket);try {// 从新更新在线人数并发给所有客户端writeMsg(1,dis);} catch (Exception e1) {e1.printStackTrace();}}}private void writeMsg(int flag, DataInputStream dis) throws Exception {//DataOutputStream dos = new DataOutputStream(socket.getOutputStream());// 定义一个变量存放最终的消息形式String msg = null ;if(flag == 1){/** 读取所有在线人数发给所有客户端去更新自己的在线人数列表 */StringBuilder rs = new StringBuilder();Collection<String> onlineNames = serverChat.onLineSocket.values();// 判断是否存在在线人数if(onlineNames != null && onlineNames.size() > 0){for(String name : onlineNames){rs.append(name+ stringConfig.SPILIT);}// 去掉最后的一个分隔符msg = rs.substring(0, rs.lastIndexOf(stringConfig.SPILIT));/** 将消息发送给所有的客户端 */sendMsgToAll(flag,msg);}}else if(flag == 2 || flag == 3){// 读到消息群发的 或者 @消息String newMsg = dis.readUTF() ; // 消息// 得到发件人String sendName = serverChat.onLineSocket.get(socket);//内容--StringBuilder msgFinal = new StringBuilder();// 时间SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss EEE");if(flag == 2){msgFinal.append(sendName).append("").append(sdf.format(System.currentTimeMillis()*2)).append("\r\n");msgFinal.append("").append(newMsg).append("\r\n");sendMsgToAll(flag,msgFinal.toString());}else if(flag == 3){msgFinal.append(sendName).append("").append(sdf.format(System.currentTimeMillis()*2)).append("对您私发\r\n");msgFinal.append("").append(newMsg).append("\r\n");// 私发// 得到给谁私发String destName = dis.readUTF();sendMsgToOne(destName,msgFinal.toString());}}}/** * @param destName 对谁私发 * @param msg 发的消息内容 * @throws Exception */private void sendMsgToOne(String destName, String msg) throws Exception {// 拿到所有的在线socket管道 给这些管道写出消息Set<Socket> allOnLineSockets = serverChat.onLineSocket.keySet();for(Socket sk :allOnLineSockets){// 得到当前需要私发的socket// 只对这个名字对应的socket私发消息if(serverChat.onLineSocket.get(sk).trim().equals(destName)){DataOutputStream dos = new DataOutputStream(sk.getOutputStream());dos.writeInt(2); // 消息类型dos.writeUTF(msg);dos.flush();}}}private void sendMsgToAll(int flag, String msg) throws Exception {// 拿到所有的在线socket管道 给这些管道写出消息Set<Socket> allOnLineSockets = serverChat.onLineSocket.keySet();for(Socket sk :allOnLineSockets){DataOutputStream dos = new DataOutputStream(sk.getOutputStream());dos.writeInt(flag); // 消息类型dos.writeUTF(msg);dos.flush();}}}clinetCatpublic class ClientChat implements ActionListener {/** 1.设计界面*/private JFrame win = new JFrame();/** 2.消息内容框架 */public JTextArea smsContent =new JTextArea(23 , 50);/** 3.发送消息的框*/private JTextArea smsSend = new JTextArea(4,40);/** 4.在线人数的区域*//** 存放人的数据 *//** 展示在线人数的窗口 */public JList<String> onLineUsers = new JList<>();// 是否私聊按钮private JCheckBox isPrivateBn = new JCheckBox("私聊");// 消息按钮private JButton sendBn= new JButton("发送");// 登录界面private JFrame loginView;private JTextField ipEt , nameEt , idEt;private Socket socket ;public static void main(String[] args) {System.out.println("---------------client start---------------------");new ClientChat().initView();}private void initView() {/** 初始化聊天窗口的界面 */win.setSize(650, 600);/** 展示登录界面*/displayLoginView();/** 展示聊天界面 *///displayChatView();}private void displayChatView() {JPanel bottomPanel = new JPanel(new BorderLayout());//-----------------------------------------------// 将消息框和按钮 添加到窗口的底端win.add(bottomPanel, BorderLayout.SOUTH);bottomPanel.add(smsSend);JPanel btns = new JPanel(new FlowLayout(FlowLayout.LEFT));btns.add(sendBn);btns.add(isPrivateBn);bottomPanel.add(btns, BorderLayout.EAST);//-----------------------------------------------// 给发送消息按钮绑定点击事件监听器// 将展示消息区centerPanel添加到窗口的中间smsContent.setBackground(new Color(0xdd,0xdd,0xdd));// 让展示消息区可以滚动。win.add(new JScrollPane(smsContent), BorderLayout.CENTER);smsContent.setEditable(false);//-----------------------------------------------// 用户列表和是否私聊放到窗口的最右边Box rightBox = new Box(BoxLayout.Y_AXIS);onLineUsers.setFixedCellWidth(120);onLineUsers.setVisibleRowCount(13);rightBox.add(new JScrollPane(onLineUsers));win.add(rightBox, BorderLayout.EAST);//-----------------------------------------------// 关闭窗口退出当前程序win.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);win.pack();// swing 加上这句 就可以拥有关闭窗口的功能/** 设置窗口居中,显示出来*/setWindowCenter(win,650,600,true);// 发送按钮绑定点击事件sendBn.addActionListener(this);}private void displayLoginView(){/** 先让用户进行登录 *服务端ip *用户名 *id **//** 显示一个qq的登录框 */loginView = new JFrame("登录");loginView.setLayout(new GridLayout(3, 1));loginView.setSize(400, 230);JPanel ip = new JPanel();JLabel label = new JLabel(" IP:");ip.add(label);ipEt = new JTextField(20);ip.add(ipEt);loginView.add(ip);JPanel name = new JPanel();JLabel label1 = new JLabel("姓名:");name.add(label1);nameEt = new JTextField(20);name.add(nameEt);loginView.add(name);JPanel btnView = new JPanel();JButton login = new JButton("登陆");btnView.add(login);JButton cancle = new JButton("取消");btnView.add(cancle);loginView.add(btnView);// 关闭窗口退出当前程序loginView.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);setWindowCenter(loginView,400,260,true);/** 给登录和取消绑定点击事件 */login.addActionListener(this);cancle.addActionListener(this);}private static void setWindowCenter(JFrame frame, int width , int height, boolean flag) {/** 得到所在系统所在屏幕的宽高 */Dimension ds = frame.getToolkit().getScreenSize();/** 拿到电脑的宽 */int width1 = ds.width;/** 高 */int height1 = ds.height ;System.out.println(width1 +"*" + height1);/** 设置窗口的左上角坐标 */frame.setLocation(width1/2 - width/2, height1/2 -height/2);frame.setVisible(flag);}@Overridepublic void actionPerformed(ActionEvent e) {/** 得到点击的事件源 */JButton btn = (JButton) e.getSource();switch(btn.getText()){case "登陆":String ip = ipEt.getText().toString();String name = nameEt.getText().toString();// 校验参数是否为空// 错误提示String msg = "" ;// 12.1.2.0// \d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\if(ip==null || !ip.matches("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}")){msg = "请输入合法的服务端ip地址";}else if(name==null || !name.matches("\\S{1,}")){msg = "姓名必须1个字符以上";}if(!msg.equals("")){/** msg有内容说明参数有为空 */// 参数一:弹出放到哪个窗口里面JOptionPane.showMessageDialog(loginView, msg);}else{try {// 参数都合法了// 当前登录的用户,去服务端登陆/** 先把当前用户的名称展示到界面 */win.setTitle(name);// 去服务端登陆连接一个socket管道socket = new Socket(ip, stringConfig.PORT);//为客户端的socket分配一个线程 专门负责收消息new ClientReader(this,socket).start();// 带上用户信息过去DataOutputStream dos = new DataOutputStream(socket.getOutputStream());dos.writeInt(1); // 登录消息dos.writeUTF(name.trim());dos.flush();// 关系当前窗口 弹出聊天界面loginView.dispose(); // 登录窗口销毁displayChatView(); // 展示了聊天窗口了} catch (Exception e1) {e1.printStackTrace();}}break;case "取消":/** 退出系统 */System.exit(0);break;case "发送":// 得到发送消息的内容String msgSend = smsSend.getText().toString();if(!msgSend.trim().equals("")){/** 发消息给服务端 */try {// 判断是否对谁发消息String selectName = onLineUsers.getSelectedValue();int flag = 2 ;// 群发 @消息if(selectName!=null&&!selectName.equals("")){msgSend =("@"+selectName+","+msgSend);/** 判断是否选中了私法 */if(isPrivateBn.isSelected()){/** 私法 */flag = 3 ;//私发消息}}DataOutputStream dos = new DataOutputStream(socket.getOutputStream());dos.writeInt(flag); // 群发消息发送给所有人dos.writeUTF(msgSend);if(flag == 3){// 告诉服务端我对谁私发dos.writeUTF(selectName.trim());}dos.flush();} catch (Exception e1) {e1.printStackTrace();}}smsSend.setText(null);break;}}}clinetReaderclass ClientReader extends Thread {private Socket socket;private ClientChat clientChat ;public ClientReader(ClientChat clientChat, Socket socket) {this.clientChat = clientChat;this.socket = socket;}@Overridepublic void run() {try {DataInputStream dis = new DataInputStream(socket.getInputStream());/** 循环一直等待客户端的消息 */while(true){/** 读取当前的消息类型 :登录,群发,私聊 , @消息 */int flag = dis.readInt();if(flag == 1){// 在线人数消息回来了String nameDatas = dis.readUTF();// 展示到在线人数的界面String[] names = nameDatas.split(stringConfig.SPILIT);clientChat.onLineUsers.setListData(names);}else if(flag == 2){//群发,私聊 , @消息 都是直接显示的。String msg = dis.readUTF() ;clientChat.smsContent.append(msg);// 让消息界面滾動到底端clientChat.smsContent.setCaretPosition(clientChat.smsContent.getText().length());}}} catch (Exception e) {e.printStackTrace();}}}stringConfigpublic class stringConfig {//常量public static final int PORT = 9999;//协议分隔符public static final String SPILIT = "003197㏘④④";}
8、NIO介紹
(1)NIO简介
Java NIO(New IO)也有人称之为 java non-blocking IO是从Java 1.4版本开始引入的一个新的IO API,可以替代标准的Java IO API。NIO与原来的IO有同样的作用和目的,但是使用的方式完全不同,NIO支持面向缓冲区的、基于通道的IO操作。NIO将以更加高效的方式进行文件的读写操作。NIO可以理解为非阻塞IO,传统的IO的read和write只能阻塞执行,线程在读写IO期间不能干其他事情,比如调用socket.read()时,如果服务器一直没有数据传输过来,线程就一直阻塞,而NIO中可以配置socket为非阻塞模式。
- NIO 相关类都被放在 java.nio 包及子包下,并且对原 java.io 包中的很多类进行改写。
- NIO 有三大核心部分:Channel( 通道) ,Buffer( 缓冲区), Selector( 选择器)
- Java NIO 的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。
- 通俗理解:NIO 是可以做到用一个线程来处理多个操作的。假设有 1000 个请求过来,根据实际情况,可以分配20 或者 80个线程来处理。不像之前的阻塞 IO 那样,非得分配 1000 个。
(2)NIO与BIO比较
- BIO 以流的方式处理数据,而 NIO 以块的方式处理数据,块 I/O 的效率比流 I/O 高很多
- BIO 是阻塞的,NIO 则是非阻塞的
- BIO 基于字节流和字符流进行操作,而 NIO 基于 Channel(通道)和 Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道
(3)NIO三大核心原理示意图
NIO 有三大核心部分Channel( 通道) ,Buffer( 缓冲区), Selector( 选择器)
(i)Buffer缓冲区
缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该块内存。相比较直接对数组的操作,Buffer API更加容易操作和管理。
(ii)Channel通道
Java NIO的通道类似流,但又有些不同:既可以从通道中读取数据,又可以写数据到通道。但流的(input或output)读写通常是单向的。 通道可以非阻塞读取和写入通道,通道可以支持读取或写入缓冲区,也支持异步地读写。
(iii)Select选择器【多路复用器】
Selector是 一个Java NIO组件,可以能够检查一个或多个 NIO 通道,并确定哪些通道已经准备好进行读取或写入。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接,提高效率
每个 channel 都会对应一个 Buffer
- 一个线程对应Selector , 一个Selector对应多个 channel(连接)
- 程序切换到哪个 channel 是由事件决定的
- Selector 会根据不同的事件,在各个通道上切换
- Buffer 就是一个内存块 , 底层是一个数组
- 数据的读取写入是通过 Buffer完成的 , BIO 中要么是输入流,或者是输出流, 不能双向,但是 NIO 的 Buffer 是可以读也可以写。
- NIO系统的核心在于:通道(Channel)和缓冲区 (Buffer)。通道表示打开到 IO 设备(例如:文件、 套接字)的连接。若需要使用 NIO 系统,需要获取 用于连接 IO 设备的通道以及用于容纳数据的缓冲 区。然后操作缓冲区,对数据进行处理。简而言之,Channel 负责传输, Buffer 负责存取数据
10、NIO缓冲区(Buffer)详讲
(1)Buffer介绍
一个用于特定基本数据类型的容器。由 java.nio 包定义的,所有缓冲区 都是 Buffer 抽象类的子类.。Java NIO 中的 Buffer 主要用于与 NIO 通道进行交互,数据是从通道读入缓冲区,从缓冲区写入通道中的
(2)Buffer类及其子类
Buffer 就像一个数组,可以保存多个相同类型的数据。根据数据类型不同 ,有以下 Buffer 常用子类:
- ByteBuffer
- CharBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
上述 Buffer 类 他们都采用相似的方法进行管理数据,只是各自管理的数据类型不同而已。都是通过如下方法获取一个 Buffer 对象:
static XxxBuffer allocate(int capacity) : 创建一个容量为capacity 的 XxxBuffer 对象
(3)缓冲区的基本属性
Buffer中的重要概念: - 容量 (capacity) :作为一个内存块,Buffer具有一定的固定大小,也称为”容量”,缓冲区容量不能为负,并且创建后不能更改。
- 限制 (limit):表示缓冲区中可以操作数据的大小(limit 后数据不能进行读写)。缓冲区的限制不能为负,并且不能大于其容量。 写入模式,限制等于buffer的容量。读取模式下,limit等于写入的数据量。
- 位置 (position):下一个要读取或写入的数据的索引。缓冲区的位置不能为负,并且不能大于其限制
- 标记 (mark)与重置 (reset):标记是一个索引,通过 Buffer 中的 mark() 方法 指定 Buffer 中一个特定的 position,之后可以通过调用 reset() 方法恢复到这 个 position.标记、位置、限制、容量遵守以下不变式: 0 <= mark <= position <= limit <= capacity
(4)Buffer常用方法
(5)缓冲区的数据操作
Buffer 所有子类提供了两个用于数据操作的方法:get()put() 方法取获取 Buffer中的数据
(6)使用Buffer遵循的步骤
- 写入数据到Buffer
- 调用flip()方法,转换为读取模式
- 从Buffer中读取数据
- 调用buffer.clear()方法或者buffer.compact()方法清除缓冲区
(7)案例演示方法使用
/*** 对缓冲区Buffer的常用API进行案例实现** @author Style**/public class bufferTest {public static void main(String[] args) {//1.分配一个缓冲区,容量设置成10ByteBuffer buffer = ByteBuffer.allocate(10);System.out.println(buffer.position()); //0System.out.println(buffer.limit());//10System.out.println(buffer.capacity()); //10System.out.println("=========================================================================");//2.向缓冲区添加数据String name ="JackStyle";buffer.put(name.getBytes());System.out.println(buffer.position()); //9位置System.out.println(buffer.limit());//10 限制System.out.println(buffer.capacity()); //10 容量System.out.println("=========================================================================");//3.Buffer flip()为将缓冲区的界限设置为当前位置,并将当前位置设置到0【可读模式】buffer.flip();System.out.println(buffer.position()); //0System.out.println(buffer.limit());//9System.out.println(buffer.capacity()); //10System.out.println("=========================================================================");//4.get数据读取System.out.println((char)buffer.get());System.out.println(buffer.position()); //1System.out.println(buffer.limit());//9System.out.println(buffer.capacity()); //10//5.清空缓存区:但需要注意,就算清空了也数据还在,其只是把位置挪到第0位,需要后序存储数据将其覆盖 buffer.clear();System.out.println(buffer.position()); //0System.out.println(buffer.limit());//10System.out.println(buffer.capacity()); //10System.out.println("=========================================================================");ByteBuffer buf = ByteBuffer.allocate(10);String n = "JackStyle";buf.put(n.getBytes());//初始化位置到0buf.flip();byte[] b = new byte[5];buf.get(b);System.out.println(new String(b));System.out.println(buf.position()); //5System.out.println(buf.limit());//9System.out.println(buf.capacity()); //10System.out.println("=========================================================================");//5.标志位置buf.mark();//标志内置位5byte[]b2 = new byte[3];buf.get(b2);System.out.println(new String(b2));System.out.println(buf.position()); //8System.out.println(buf.limit());//9System.out.println(buf.capacity()); //10System.out.println("=========================================================================");buf.reset();//回到标记位if(buf.hasRemaining()){System.out.println(buf.remaining());//剩余位}}}
(8)直接与非直接缓冲区
什么是直接内存与非直接内存?
根据官方文档的描述:
byte byffer可以是两种类型,一种是基于直接内存(也就是非堆内存);另一种是非直接内存(也就是堆内存)。对于直接内存来说,JVM将会在IO操作上具有更高的性能,因为它直接作用于本地系统的IO操作。而非直接内存,也就是堆内存中的数据,如果要作IO操作,会先从本进程内存复制到直接内存,再利用本地IO处理。
从数据流的角度,
非直接内存作用链:本地IO–>直接内存–>非直接内存–>直接内存–>本地IO
直接内存作用链:本地IO–>直接内存–>本地IO
很明显,在做IO处理时,比如网络发送大量数据时,直接内存会具有更高的效率。直接内存使用allocateDirect创建,但是它比申请普通的堆内存需要耗费更高的性能。不过,这部分的数据是在JVM之外的,因此它不会占用应用的内存。所以呢,当你有很大的数据要缓存,并且它的生命周期又很长,那么就比较适合使用直接内存。只是一般来说,如果不是能带来很明显的性能提升,还是推荐直接使用堆内存。字节缓冲区是直接缓冲区还是非直接缓冲区可通过调用其 isDirect() 方法来确定。
public static void main(String[] args) {
//申请一个直接缓冲区
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
System.out.println(buffer.isDirect());
System.out.println(“=======================================================”);
ByteBuffer buffer1 = ByteBuffer.allocate(1024);
System.out.println(buffer1.isDirect());
}
直接缓存使用场景有很大的数据需要存储,它的生命周期又很长
适合频繁的IO操作,比如网络并发场景
11、NIO通道(Channel)详讲
(1)Cannel概述
通道(Channel):由 java.nio.channels 包定义 的。Channel 表示 IO 源与目标打开的连接。 Channel 类似于传统的“流”。只不过 Channel 本身不能直接访问数据,Channel 只能与 Buffer 进行交互。
(i) NIO 的通道类似于流,但有些区别如下:通道可以同时进行读写,而流只能读或者只能写
通道可以实现异步读写数据
通道可以从缓冲读数据,也可以写数据到缓冲:
(ii)BIO 中的 stream 是单向的,例如 FileInputStream 对象只能进行读取数据的操作,而 NIO 中的通道(Channel)是双向的,可以读操作,也可以写操作。
(iii)Channel 在 NIO 中是一个接口
public interface Channel extends Closeable{}
(2)常见Channel实现类FileChannel:用于读取、写入、映射和操作文件的通道。
DatagramChannel:通过 UDP 读写网络中的数据通道。
SocketChannel:通过 TCP 读写网络中的数据。
ServerSocketChannel:可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个 SocketChannel。 【ServerSocketChanne 类似 ServerSocket , SocketChannel 类似 Socket】
(3)FileChannel 类
获取通道的一种方式是对支持通道的对象调用getChannel() 方法。支持通道的类如下:FileInputStream
FileOutputStream
RandomAccessFile
DatagramSocket
Socket
ServerSocket
获取通道的其他方式是使用 Files 类的静态方法 newByteChannel() 获取字节通道。或者通过通道的静态方法 open() 打开并返回指定通道
(4)FileChannel的常用方法int read(ByteBuffer dst) 从 Channel 到 中读取数据到 ByteBuffer
long read(ByteBuffer[] dsts) 将 Channel 到 中的数据“分散”到 ByteBuffer[]
int write(ByteBuffer src) 将 ByteBuffer 到 中的数据写入到 Channel
long write(ByteBuffer[] srcs) 将 ByteBuffer[] 到 中的数据“聚集”到 Channel
long position() 返回此通道的文件位置
FileChannel position(long p) 设置此通道的文件位置
long size() 返回此通道的文件的当前大小
FileChannel truncate(long s) 将此通道的文件截取为给定大小
void force(boolean metaData) 强制将所有对此通道的文件更新写入到存储设备中
12、NIO案例实现
(1)使用NIO通道实现数据写入或读取文件
write
@Testpublic void write() {try {//1.字节输出流通向目标文件FileOutputStream fos = new FileOutputStream("data.txt");//2.得到字节输出流对应的通道FileChannel channel = fos.getChannel();//3.分配缓存区ByteBuffer buffer = ByteBuffer.allocate(1024);buffer.put("hello nio channel".getBytes());//4.切换成写出模式buffer.flip();channel.write(buffer);System.out.println("写数据到文件完成");} catch (FileNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}}
read
@Testpublic void read() {try {//1.定义一个文件字节输入流与原文件接通FileInputStream fis = new FileInputStream("data.txt");//2.得到文件字节输入流的文件通道FileChannel channel = fis.getChannel();//3.定义一个缓冲区ByteBuffer buffer = ByteBuffer.allocate(1024);//4、直接读取数据到缓冲区channel.read(buffer);//5.读取缓冲区数据并输出【注意:当要读取数据时,需要将缓冲区位置设置到0,然后从头读到尾】buffer.flip();System.out.println(new String(buffer.array(),0,buffer.remaining()));} catch (FileNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}}
(2)使用Buffer完成文件复制
@Testpublic void copyFile() throws IOException {//1.获取源文件流File file = new File("E:\\网络编程BIO NIO AIO\\讲义\\cat.jpg");File fileto = new File("E:\\网络编程BIO NIO AIO\\讲义\\test\\" + UUID.randomUUID() + ".jpg");//2.得到一个字节输出流和字节输入流FileInputStream fis = new FileInputStream(file);FileOutputStream fos = new FileOutputStream(fileto);//3.得到的是文件通道FileChannel fischannel = fis.getChannel();FileChannel foschannel = fos.getChannel();//4.分配缓冲区ByteBuffer buffer = ByteBuffer.allocate(1024);while (true) {//必须清空缓存区buffer.clear();//开始读取一次数据int flag = fischannel.read(buffer);if (flag == -1) {break;}//已经读取了数据,把缓冲区的模式切换成可读模式buffer.flip();//把数据写出foschannel.write(buffer);}fischannel.close();foschannel.close();System.out.println("复制完成");}
(3)分散(Scatter)和聚集(Gather)
- 分散读取(Scatter ):是指把Channel通道的数据读入到多个缓冲区中去
- 聚集写入(Gathering )是指将多个 Buffer 中的数据“聚集”到 Channel。
@Testpublic void test01() throws IOException {//1.字节输入管道FileInputStream fis = new FileInputStream("data01.txt");FileChannel fisChannel = fis.getChannel();//2.字节输入流管道FileOutputStream fos = new FileOutputStream("data02.txt");FileChannel fosChannel = fos.getChannel();//3.定义多个缓冲区做数据分散ByteBuffer buffer01 = ByteBuffer.allocate(4);ByteBuffer buffer02 = ByteBuffer.allocate(1024);ByteBuffer[] buffers = {buffer01, buffer02};//4.从通道中读取数据坟山到各缓冲区 fisChannel.read(buffers);//5.从每个缓冲区中查询是否有数据读取到for(ByteBuffer buffer : buffers){buffer.flip();//切换到读数据模式System.out.println(new String(buffer.array(),0,buffer.remaining()));}//6.聚集写入到通道fosChannel.write(buffers);fisChannel.close();fosChannel.close();System.out.println("文件复制完成");}
(4)transferFrom()与transferTo()
- transferFrom()
作用:从目标通道中去复制原通道数据
@Testpublic void test03() throws IOException {//1.字节输入管道FileInputStream fis = new FileInputStream("data01.txt");FileChannel fisChannel = fis.getChannel();//2.字节输入流管道FileOutputStream fos = new FileOutputStream("data03.txt");FileChannel fosChannel = fos.getChannel();//3.复制数据fosChannel.transferFrom(fisChannel, fisChannel.position(), fisChannel.size());fisChannel.close();fosChannel.close();LOGGER.info("复制文件完成");}
- transferTo()
作用:把原通道数据复制到目标通道
@Testpublic void test04() throws IOException {//1.字节输入管道FileInputStream fis = new FileInputStream("data01.txt");FileChannel fisChannel = fis.getChannel();//2.字节输入流管道FileOutputStream fos = new FileOutputStream("data04.txt");FileChannel fosChannel = fos.getChannel();//3.复制数据fisChannel.transferTo(fisChannel.position(), fisChannel.size(),fosChannel);fisChannel.close();fosChannel.close();LOGGER.info("复制文件完成");}
13、NIO选择器【多路复用器】Selector
(1)概述
选择器(Selector) 是 SelectableChannle 对象的多路复用器,Selector 可以同时监控多个 SelectableChannel 的 IO 状况,也就是说,利用 Selector可使一个单独的线程管理多个 Channel。Selector 是非阻塞 IO 的核心
- Java 的 NIO,用非阻塞的 IO 方式。可以用一个线程,处理多个的客户端连接,就会使用到 Selector(选择器)
- Selector 能够检测多个注册的通道上是否有事件发生(注意:多个 Channel 以事件的方式可以注册到同一个
Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管
理多个通道,也就是管理多个连接和请求。 - 只有在 连接/通道 真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都
创建一个线程,不用去维护多个线程 - 避免了多线程之间的上下文切换导致的开销
(2)selector应用
步骤: - 创建 Selector :通过调用 Selector.open() 方法创建一个 Selector。
Selector selector = Selector.open();
- 向选择器注册通道:SelectableChannel.register(Selector sel, int ops)
//1. 获取通道ServerSocketChannel ssChannel = ServerSocketChannel.open();//2. 切换非阻塞模式ssChannel.configureBlocking(false);//3. 绑定连接ssChannel.bind(new InetSocketAddress(9898));//4. 获取选择器Selector selector = Selector.open();//5. 将通道注册到选择器上, 并且指定“监听接收事件”ssChannel.register(selector, SelectionKey.OP_ACCEPT);
当调用 register(Selector sel, int ops) 将通道注册选择器时,选择器对通道的监听事件,需要通过第二个参数 ops 指定。可以监听的事件类型(用 可使用 SelectionKey 的四个常量 表示):
- 读 : SelectionKey.OP_READ (1)
- 写 : SelectionKey.OP_WRITE (4)
- 连接 : SelectionKey.OP_CONNECT (8)
- 接收 : SelectionKey.OP_ACCEPT (16)
- 若注册时不止监听一个事件,则可以使用“位或”操作符连接。
int interestSet = SelectionKey.OP_READ|SelectionKey.OP_WRITE
14、NIO非阻塞式网络通信原理分析
(1)Selector 示意图和特点说明
Selector可以实现: 一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。
(2)服务端流程
- 当客户端连接服务端时,服务端会通过 ServerSocketChannel 得到 SocketChannel:1. 获取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
- 切换非阻塞模式
ssChannel.configureBlocking(false);
- 绑定连接
ssChannel.bind(new InetSocketAddress(9999));
- 获取选择器
Selector selector = Selector.open();
- 将通道注册到选择器上, 并且指定“监听接收事件”
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
- 轮询式的获取选择器上已经“准备就绪”的事件
//轮询式的获取选择器上已经“准备就绪”的事件 while (selector.select() > 0) {System.out.println("轮一轮");//7. 获取当前选择器中所有注册的“选择键(已就绪的监听事件)”Iterator<SelectionKey> it = selector.selectedKeys().iterator();while (it.hasNext()) {//8. 获取准备“就绪”的是事件SelectionKey sk = it.next();//9. 判断具体是什么事件准备就绪if (sk.isAcceptable()) {//10. 若“接收就绪”,获取客户端连接SocketChannel sChannel = ssChannel.accept();//11. 切换非阻塞模式sChannel.configureBlocking(false);//12. 将该通道注册到选择器上sChannel.register(selector, SelectionKey.OP_READ);} else if (sk.isReadable()) {//13. 获取当前选择器上“读就绪”状态的通道SocketChannel sChannel = (SocketChannel) sk.channel();//14. 读取数据ByteBuffer buf = ByteBuffer.allocate(1024);int len = 0;while ((len = sChannel.read(buf)) > 0) {buf.flip();System.out.println(new String(buf.array(), 0, len));buf.clear();}}//15. 取消选择键 SelectionKeyit.remove();}}}
(3)客户端流程
- 获取通道
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999));
- 切换非阻塞模式
sChannel.configureBlocking(false);
- 分配指定大小的缓冲区
ByteBuffer buf = ByteBuffer.allocate(1024);
- 发送数据给服务端
Scanner scan = new Scanner(System.in);while(scan.hasNext()){String str = scan.nextLine();buf.put((new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(System.currentTimeMillis())+ "\n" + str).getBytes());buf.flip();sChannel.write(buf);buf.clear();}//关闭通道sChannel.close();
(4)案例演示
- 实现服务端接收客户端的信息
serverpublic class server {public static void main(String[] args) throws IOException {System.out.println("==============================server start============================");//1.获取通道ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();//2.切换为非阻塞模式serverSocketChannel.configureBlocking(false);//3.绑定连接端口serverSocketChannel.bind(new InetSocketAddress(9999));//4.获取选择器Selector selector = Selector.open();//5.将通道注册到选择器上,并开始指定监听事件serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//6.使用select选择器轮询就绪事件while(selector.select()>0){//7.获取选择器中所有注册的通道中已经就绪好的事件Iterator<SelectionKey> it = selector.selectedKeys().iterator();//8.开始遍历准备好的事件while(it.hasNext()){//提取当前这个事件SelectionKey sk = it.next();//9.判断事件具体是什么if(sk.isAcceptable()){//10.直接获取当前接入的客户端通道SocketChannel schannel = serverSocketChannel.accept();//11.切换成非阻塞模式schannel.configureBlocking(false);//12.将本客户端通道注册到选择器schannel.register(selector,SelectionKey.OP_READ);}else if(sk.isReadable()){//13.当选择器上的读就绪事件SocketChannel schannel = (SocketChannel) sk.channel();//14.读取数据ByteBuffer buffer= ByteBuffer.allocate(1024);int len = 0;while((len=schannel.read(buffer))>0){buffer.flip();System.out.println(new String(buffer.array(),0,len));buffer.clear();}}//15.处理完毕之后需要移除当前事件it.remove();}}}}clientpublic class client {public static void main(String[] args) throws IOException {System.out.println("==============================client start============================");//1.获取通道SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",9999));//2.切换非阻塞模式sChannel.configureBlocking(false);//3.分配缓存区大小ByteBuffer buffer=ByteBuffer.allocate(1024);//4.发送数据给服务器Scanner sc = new Scanner(System.in);while (true){System.out.print("请说:");String msg = sc.next();System.out.println();buffer.put(("Jackstyle:"+msg).getBytes());buffer.flip();sChannel.write(buffer);buffer.clear();}}}
- 使用NIO实现聊天室发送消息给全部在线客户端
serverpublic class server {//1.定义一些成员属性:选择器、服务端通道、端口private Selector selector;private ServerSocketChannel sChannel;private final int PORT = 9999;private final Logger LOGGER = Logger.getLogger("server");//2.定义初始化代码逻辑public server() throws IOException {//a.创建选择器对象selector = Selector.open();//b.获取通道sChannel = ServerSocketChannel.open();//c.绑定客户端连接的端口sChannel.bind(new InetSocketAddress(PORT));//d.设置非阻塞通信模式sChannel.configureBlocking(false);//e.把通道注册到选择器上去,并且开始指定接收连接事件sChannel.register(selector, SelectionKey.OP_ACCEPT);LOGGER.info("服务端开启");}public static void main(String[] args) throws IOException {//创建服务端对象server server = new server();//开始监听客户端各种消息事件:连接、群聊消息、离线消息server.listen();}/** * 监听逻辑 */private void listen() {try {while (selector.select() > 0) {//a.获取选择器中所有注册通道的就绪事件Iterator<SelectionKey> it = selector.selectedKeys().iterator();//b.开始遍历事件while (it.hasNext()) {//提取事件SelectionKey sk = it.next();//判断这个事件的类型if (sk.isAcceptable()) {//客户端接入请求//获取客户端通道SocketChannel socketChannel = sChannel.accept();//注册为非阻塞模式socketChannel.configureBlocking(false);//注册给选择器,监听读数据的事件socketChannel.register(selector, SelectionKey.OP_READ);} else if (sk.isReadable()) {//处理这个客户端的消息,接收它然后实现转发逻辑readClientData(sk);}it.remove();}}} catch (Exception e) {}}/** * 接收当前客户端通道信息,转发给其他全部客户顿通道 * * @param sk */private void readClientData(SelectionKey sk) throws IOException {SocketChannel socketChannel = null;try {//直接得到当前客户端通道socketChannel = (SocketChannel) sk.channel();//创建缓冲区开始接收客户端数据ByteBuffer buffer = ByteBuffer.allocate(1024);int count = socketChannel.read(buffer);if (count > 0) {buffer.flip();//提取读取到的信息String msg = new String(buffer.array(), 0, buffer.remaining());LOGGER.info("接收到客户端消息");//把这个消息推送给全部客户端接收sendMsgToAllClient(msg, socketChannel);}} catch (Exception e) {LOGGER.info("有人离线" + socketChannel.getRemoteAddress());//当前客户端离线sk.cancel();socketChannel.close();}}/** * 将消息推送给所有人 * * @param msg * @param socketChannel */private void sendMsgToAllClient(String msg, SocketChannel socketChannel) throws IOException {LOGGER.info("服务端开始转发消息,当前处理的线程是" + Thread.currentThread().getName());for (SelectionKey key : selector.keys()) {Channel channel =key.channel();//剔除自己通道,不将数据发送给自己if (channel instanceof SocketChannel && socketChannel != channel) { ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());((SocketChannel)channel).write(buffer);}}}}clientpublic class client {//1.定义客户端相关属性private Selector selector;private final int PORT =9999;private SocketChannel socketChannel;private final Logger LOGGER = Logger.getLogger("client");public client() throws IOException {selector = Selector.open();socketChannel= SocketChannel.open(new InetSocketAddress("127.0.0.1",PORT));socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);LOGGER.info("客户端开启");}public static void main(String[] args) throws IOException {client client= new client();//定义一个线程专门负责监听服务端发送过来的读消息new Thread(new Runnable() {@Overridepublic void run() {try {client.readInfo();} catch (IOException e) {e.printStackTrace();}}}).start();Scanner sc = new Scanner(System.in);while (sc.hasNext()){String s = sc.next();client.sendToServer(s);}}/** * 向客户端发送消息 * @param s */private void sendToServer(String s) throws IOException {socketChannel.write(ByteBuffer.wrap(("Gogo spack"+s).getBytes()));}/** * 读取客户端发送的信息 */private void readInfo() throws IOException { while(selector.select()>0){ Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while(it.hasNext()){ SelectionKey sk = it.next(); if(sk.isReadable()){ SocketChannel sc = (SocketChannel) sk.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); sc.read(buffer); System.out.println(new String(buffer.array()).trim()); } it.remove(); } }}}
15、AIO详讲
(1)AIO介绍
Java AIO(NIO.2) : 异步非阻塞,服务器实现模式为一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理。
(2)AIO、BIO、NIO比较
Aio异步非阻塞,基于NIO的,可以称之为NIO2.0
与NIO不同,当进行读写操作时,只须直接调用API的read或write方法即可, 这两种方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序
即可以理解为,read/write方法都是异步的,完成后会主动调用回调函数。在JDK1.7中,这部分内容被称作NIO.2,主要在Java.nio.channels包下增加了下面四个异步通道:
AsynchronousSocketChannel
AsynchronousServerSocketChannel
AsynchronousFileChannel
AsynchronousDatagramChannel
16、总结
- Java BIO : 同步并阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然可以通过线程池机制改善。
- Java NIO : 同步非阻塞,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理。
- Java AIO(NIO.2) : 异步非阻塞,服务器实现模式为一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理。
后续学习【Netty】是通信架构的框架