🚀1. Java BIO 介绍
🎁Java BIO(Java Blocking I/O)是 Java 网络编程所支持的一种传统阻塞型 I/O 模型,服务器实现模式为一个连接一个线程,当客户端有连接请求时,服务器就启动一个线程进行处理,如果这个连接不做任何事情则会造成不必要的线程开销,此时可以通过线程池机制改善(实现多个客户连接服务器)。
🎉Java BIO 工作机制
🎈Client 客户端分为两个过程
1、通过 Socket 对象请求与服务端的连接
2、从 Socket 中得到字节输入或者是字节输出流进行数据的读写操作
🎈Server 服务端分为三个过程
1、通过 ServerSocket 注册端口
2、服务端通过调用 accpet 方法用于监听客户端的 Socket 请求
3、从 Socket 中获取字节输入或者输出流进行数据的读写操作
🚀2. BIO 模式下多发和多收消息
🎉在传统的同步阻塞模型开发中,服务端 ServerSocket 负责绑定 IP 地址,启动监听端口;客户端 Socket 负责发起连接操作,连接成功后双方通过输入和输出流进行同步阻塞式通信,基于 BIO 模式的通信,客户端-服务端是完全同步、完全耦合的。
✨下面将首先给出一个传统的 BIO 应用实例,客户端代码如下
public class Client { public static void main(String[] args) { try { //1.创建Socket对象请求服务端的连接 Socket socket = new Socket("127.0.0.1", 9000); //2.从Socket对象中获取一个字节输出流 OutputStream os = socket.getOutputStream(); //3.把字节输出流包装成一个打印流 PrintStream ps = new PrintStream(os); ps.println("hello world! 服务端,你好"); ps.flush(); } catch (UnknownHostException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
✨服务端代码如下
public class Server { public static void main(String[] args) { try { System.out.println("-----------服务端请求-------------"); //1.定义一个ServerSocket对象请求进行服务端的端口注册 ServerSocket ss = new ServerSocket(9000); //2.监听客户端的Socket连接请求 Socket sock = ss.accept(); //3.从socket管道中得到一个字节输入流对象 InputStream is = sock.getInputStream(); //4.把字节输入流包装成一个缓存字符输入流 BufferedReader br = new BufferedReader(new InputStreamReader(is)); String msg; if ((msg = br.readLine()) != null) { System.out.println("服务端接收到:"+msg); } } catch (IOException e) { e.printStackTrace(); } } }
运行结果如下
-----------服务端请求------------- 服务端接收到:hello world! 服务端,你好
🎉从上面的的案例可以看到,在通信过程中,服务端会一直等待客户端的消息,如果客户端没有进行消息的发送,服务端将一直处于阻塞状态。同时服务端是按行获取消息的,这意味着客户端也必须按照行发送消息,否则服务端将进入等待消息的阻塞状态。
🎉那么,在上面的案例中可以看到,只能实现客户端发送消息,服务端接收消息,并不能实现客户端多发以及服务端多收消息的功能。为了解决这个问题,只需要修改客户端和服务端的发送和接收消息的逻辑即可,案列代码如下。
✨客户端代码如下
public class Client { public static void main(String[] args) { try { //1.创建Socket对象请求服务端的连接 Socket socket = new Socket("127.0.0.1", 9000); //2.从Socket对象中获取一个字节输出流 OutputStream os = socket.getOutputStream(); //3.把字节输出流包装成一个打印流 PrintStream ps = new PrintStream(os); Scanner sc = new Scanner(System.in); while (true) { System.out.println("请说:"); String msg = sc.nextLine(); ps.println(msg); ps.flush(); } } catch (IOException e) { e.printStackTrace(); } } }
✨服务端代码如下
public class Server { public static void main(String[] args) { try { //1.定义一个ServerSocket对象请求进行服务端的端口注册 ServerSocket ss = new ServerSocket(9000); //2.监听客户端的Socket连接请求 Socket sock = ss.accept(); //3.从socket管道中得到一个字节输入流对象 InputStream is = sock.getInputStream(); //4.把字节输入流包装成一个缓存字符输入流 BufferedReader br = new BufferedReader(new InputStreamReader(is)); String msg; while ((msg = br.readLine()) != null) { System.out.println("服务端接收到:"+msg); } } catch (IOException e) { e.printStackTrace(); } } }
✨运行结果如下
-------------客户端---------------: 请说:hello 请说:What are you doing? 请说:Do you enjoy your holiday? -------------服务端---------------: 服务端接收到:hello 服务端接收到:What are you doing? 服务端接收到:Do you enjoy your holiday?
🚀3. BIO 模式下服务端接收多个客户端通信请求
🎉在上一章节的案例中,一个服务端只能接收一个客户端的通信请求,如果服务端需要接收多个客户端的通信请求,应该如何处理呢?此时,就需要在服务端中引入线程了,也就是说客户端每发起一个请求,服务端就创建一个新的线程来处理这个客户端的请求,这个就实现了一个客户端一个线程的模型。
✨下图为多个客户端与服务端的通信模式图解
✨下面将给出 BIO 模式下服务端接收多个客户端通信请求的案例,客户端代码如下
public class Client { public static void main(String[] args) { try { //1.创建Socket对象请求服务端的连接 Socket socket = new Socket("127.0.0.1", 9000); //2.从Socket对象中获取一个字节输出流 OutputStream os = socket.getOutputStream(); //3.把字节输出流包装成一个打印流 PrintStream ps = new PrintStream(os); Scanner sc = new Scanner(System.in); //4.使用循环不断的发送消息给服务端接收 while (true) { System.out.print("请说:"); String msg = sc.nextLine(); ps.println(msg); ps.flush(); } } catch (IOException e) { e.printStackTrace(); } } }
✨服务端代码如下
//目标:服务端可以实现同时接收多个客户端的socket请求 //思路:服务端每接收到一个客户端socket请求对象之后都交给一个独立的线程来处理客户端的数据交互请求 public class Server { public static void main(String[] args) { try { //1.定义一个ServerSocket对象请求进行服务端的端口注册 ServerSocket ss = new ServerSocket(9000); //2.使用循环不断的接收客户端的socket通信请求 while (true) { Socket socket = ss.accept(); new ServerThreadReader(socket).start(); } } catch (IOException e) { e.printStackTrace(); } } }
✨ServerThreadReader 线程类实现代码如下
public class ServerThreadReader extends Thread { private Socket socket; public ServerThreadReader(Socket socket) { this.socket = socket; } @Override public void run() { try { //从socket对象中得到一个字节输入流 InputStream is = socket.getInputStream(); //使用缓存字符输入流包装字节输入流 BufferedReader br = new BufferedReader(new InputStreamReader(is)); String msg; while ((msg = br.readLine()) != null) { System.out.println(msg); } } catch (IOException e) { e.printStackTrace(); } } }
✨运行结果如下
-----------客户端1------------- 请说:我是客户端1 -----------客户端2------------- 请说:我是客户端2 -----------客户端3------------- 请说:我是客户端3 -----------服务端------------- 我是客户端1 我是客户端2 我是客户端3
✨如何模拟多个客户端发送请求呢?很简单,只需要将 Client 类复制多份,例如 Client1、Client2、Client3,再同时运行即可。
🎉对于上面的案例做一个总结:
🎈每个 socket 接收到通信请求,都会创建一个线程
🎈每个线程都会占用栈空间和 CPU 资源,并且线程的竞争和上下文切换会影响性能
🎈并不是每个 socket 都进行 I/O 操作,无意义的线程处理
🎈客户端的并发访问增加时,服务端将呈现 1:1 的线程开销,访问量越大,系统将越可能发生线程栈溢出,进而线程创建失败最终导致进程宕机或者僵死,从而不能对外提供服务。
🚀4. 伪异步 I/O 编程
🎉由于 BIO 模式下服务端接收多个客户端通信请求存在的种种弊端,下面将使用一个新的方式,伪异步 I/O 通信框架,采用线程池和任务队列实现,当客户端接入时,将客户端的 socket 封装成一个 Task(该任务实现 java.lang.Runnable 接口)交给后端的线程池中进行处理。
🎉JDK 的线程池维护一个消息队列和 N 个活跃的线程,对消息队列中 socket 任务进行处理,由于线程池可以设置消息队列的大小和最大线程数,因此可以控制资源使用,无论多少个客户端并发访问,都不会造成资源耗尽服务端宕机。
✨它的工作模式图解如下所示
✨客户端代码如下
public class Client1 { public static void main(String[] args) { try { //1.创建Socket对象请求服务端的连接 Socket socket = new Socket("127.0.0.1", 9000); //2.从Socket对象中获取一个字节输出流 OutputStream os = socket.getOutputStream(); //3.把字节输出流包装成一个打印流 PrintStream ps = new PrintStream(os); Scanner sc = new Scanner(System.in); //4.使用循环不断的发送消息给服务端接收 while (true) { System.out.print("请说:"); String msg = sc.nextLine(); ps.println(msg); ps.flush(); } } catch (IOException e) { e.printStackTrace(); } } }
✨线程池处理类代码如下
public class HandlerSocketServerPool { //1.创建一个线程池的成员变量用于存储一个线程池对象 private ExecutorService executorService; /** * 2.创建这个类的对象的时候需要初始化线程池对象 * public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, * TimeUnit unit, BlockingQueue<Runnable> workQueue) */ public HandlerSocketServerPool(int maxThreadNum, int queueSize) { executorService = new ThreadPoolExecutor(3, maxThreadNum, 120, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize)); } //3.提供一个方法来提交任务给线程池的任务队列来暂存,等待线程池处理 public void execute(Runnable target) { executorService.execute(target); } }
✨Socket 任务类代码如下
public class ServerRunnableTarget implements Runnable{ private Socket socket; public ServerRunnableTarget(Socket socket) { this.socket = socket; } @Override public void run() { //处理接收到的客户端socket通信请求 try { //1.从socket管道中得到一个字节输入流对象 InputStream is = socket.getInputStream(); //2.把字节流包装成一个缓存字符输入流 BufferedReader br = new BufferedReader(new InputStreamReader(is)); String msg; while ((msg = br.readLine()) != null) { System.out.println("服务端收到:"+msg); } } catch (IOException e) { e.printStackTrace(); } } }
✨服务端代码如下
/** 目标:开发实现伪异步通讯架构 思路:服务端每接收到一个客户端socket请求对象之后都交给一个独立的线程来处理客户端的数据交互需求 **/ public class Server { public static void main(String[] args) { try { //1.注册端口 ServerSocket ss = new ServerSocket(9999); //2.定义一个死循环,负责不断的接收客户端的Socket的连接请求 //初始化一个线程池对象 HandlerSocketServerPool pool = new HandlerSocketServerPool(3,10); while (true) { Socket socket = ss.accept(); //3.把socket对象交给一个线程池处理 //把socket封装成一个任务对象交给线程池处理 Runnable target = new ServerRunnableTarget(socket); pool.execute(); } catch(Exeception e) { e.printStackTrace(); } } } } /**
✨输出
服务端收到:client1 服务端收到:client2 服务端收到:client3 java.net.SocketException: Connection reset at java.net.SocketInputStream.read(SocketInputStream.java:210) at java.net.SocketInputStream.read(SocketInputStream.java:141) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:161) at java.io.BufferedReader.readLine(BufferedReader.java:324) at java.io.BufferedReader.readLine(BufferedReader.java:389) at com.czh.ServerRunnableTarget.run(ServerRunnableTarget.java:30) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 服务端收到:client4 java.net.SocketException: Connection reset at java.net.SocketInputStream.read(SocketInputStream.java:210) at java.net.SocketInputStream.read(SocketInputStream.java:141) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:161) at java.io.BufferedReader.readLine(BufferedReader.java:324) at java.io.BufferedReader.readLine(BufferedReader.java:389) at com.czh.ServerRunnableTarget.run(ServerRunnableTarget.java:30) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 服务端收到:client5
🎉总结
🎈伪异步采用了线程池实现,因此避免为每个请求创建一个独立线程造成线程资源耗尽的问题,但是由于依然是采用同步阻塞模型,因此无法从根本解决问题
🎈如果单个消息处理的缓慢,或者服务器线程池中的全部线程都被阻塞,那么后续 socket 的 I/O 消息都将在队列中排队,新的 socket 请求将被拒绝,客户端会发生大量连接超时
🚀5. 基于 BIO 模式的文件上传
✨基于 BIO 模式实现任意类型文件形式的上传,让服务端保存起来。
✨客户端代码如下
public class Client { public static void main(String[] args) { try { FileInputStream is = new FileInputStream("C:\\Users\\Desktop\\IMG_20220405_113738.jpg"); //1.创建Socket对象请求服务端的连接 Socket socket = new Socket("127.0.0.1", 9000); //2.把字节输出流包装成一个数据输出流(DataOutputStream可以做分段数据发送) DataOutputStream dos = new DataOutputStream(socket.getOutputStream()); //3.先发送上传文件的后缀给服务器 dos.writeUTF(".jpg"); //4.把文件数据发送给服务端进行接收 byte[] buffer = new byte[1024]; int len; while ((len = is.read(buffer)) > 0) { dos.write(buffer, 0, len); } dos.flush(); socket.shutdownOutput(); } catch (IOException e) { e.printStackTrace(); } } }
✨服务端代码如下
public class Server { public static void main(String[] args) { try { //1.定义一个ServerSocket对象请求进行服务端的端口注册 ServerSocket ss = new ServerSocket(9000); while (true) { Socket soc = ss.accept(); new ServerReadThread(soc).start(); } } catch (IOException e) { e.printStackTrace(); } } }
✨Socket 线程处理类代码如下
public class ServerReadThread extends Thread { private Socket socket; public ServerReadThread(Socket socket) { this.socket = socket; } @Override public void run() { try { //1.得到一个数据输入流来读取客户端发送过来的数据 DataInputStream dis = new DataInputStream(socket.getInputStream()); //2.读取客户端发送过来的文件类型 String suffix = dis.readUTF(); System.out.println("服务端已经成功接收到了文件类型:"+suffix); //3.定义一个字节输出管道,负责把客户端发送过来的文件数据写出去 FileOutputStream os = new FileOutputStream("C:\\Users\\Desktop\\" + UUID.randomUUID().toString() + suffix); //4.从数据输入流中读取文件数据,写出到字节输出流中去 byte[] buffer = new byte[1024]; int len; while ((len = dis.read(buffer)) > 0) { os.write(buffer, 0 , len); } os.close(); System.out.println("服务端接收文件保存成功!"); } catch (IOException e) { e.printStackTrace(); } } }
✨运行结果如下
服务端已经成功接收到了文件类型:.jpg 服务端接收文件保存成功!
🎉总结
🎈Java BIO 模式下,服务端与客户端必要保持一致的操作。客户端发送文件使用 DataOutputStream,那么服务端就使用 DataInputStream,客户端使用 dos.writeUTF(“.jpg”),服务端就使用 dis.readUTF()
🎈客户端发完数据后必须通知服务端自己已经发完,使用 shutdownOutput(),
🚀6. 基于 BIO 模式的端口转发
✨基于 BIO 模式的端口转发,实际上也就是一个客户端的消息可以发送给所有的客户端去接收,这也是实现群聊的基础。
✨基于 BIO 模式的端口转发工作模式图解如下
✨客户端代码如下
public class Client { public static void main(String[] args) { try { //1.创建Socket对象请求服务端的连接 Socket socket = new Socket("127.0.0.1", 9000); //收消息 Thread clientReaderThread = new ClientReaderThread(socket); clientReaderThread.start(); while (true) { //发消息 OutputStream os = socket.getOutputStream(); PrintStream ps = new PrintStream(os); //使用循环不断的发送消息给服务端接收 Scanner sc = new Scanner(System.in); String msg = sc.nextLine(); ps.println(msg); ps.flush(); } } catch (IOException e) { e.printStackTrace(); } } }
✨客户端线程处理类代码如下
public class ClientReaderThread extends Thread { private Socket socket; public ClientReaderThread(Socket socket) { this.socket = socket; } @Override public void run() { try { while (true) { InputStream is = socket.getInputStream(); //把字节输入流包装成一个缓存字符输入流 BufferedReader br = new BufferedReader(new InputStreamReader(is)); String msg; if ((msg = br.readLine()) != null) { System.out.println(msg); } } } catch (IOException e) { e.printStackTrace(); } } }
✨服务端代码如下
public class Server { //定义一个静态集合 public static List<Socket> allSocketOnLine = new ArrayList<>(); public static void main(String[] args) { try { //定义一个ServerSocket对象请求进行服务端的端口注册 ServerSocket ss = new ServerSocket(9000); while (true) { Socket soc = ss.accept(); //把登录的客户端socket存入到一个集合中去 allSocketOnLine.add(soc); //为当前登录成功的socket分配一个独立的线程来处理与之通信 new ServerReaderThread(soc).start(); } } catch (IOException e) { e.printStackTrace(); } } }
✨服务端线程处理类代码如下
public class ServerReaderThread extends Thread { private Socket socket; public ServerReaderThread(Socket socket) { this.socket = socket; } @Override public void run() { try { BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream())); String msg; while ((msg = br.readLine()) != null) { System.out.println("服务器收到消息:"+msg); } } catch (IOException e) { e.printStackTrace(); System.out.println("当前有人下线了!"); Server.allSocketOnLine.remove(socket); } } //把当前客户端发送过来的消息推送给全部在线的socket private void sendMsgToAllClient(String msg, Socket socket) throws Exception { for (Socket sk : Server.allSocketOnLine) { //只发送给除自己以外的其他客户端 if (socket != sk) { PrintStream ps = new PrintStream(sk.getOutputStream()); ps.println(msg); ps.flush(); } } } }
✨运行结果如下
--------------服务端--------------- 服务端收到消息:我是client1,大家听见我说话吗 服务端收到消息:我是client2,大家听见我说话吗 服务端收到消息:我是client3,大家听见我说话吗 --------------客户端1--------------- 我是client1,大家听见我说话吗 --发送 我是client2,大家听见我说话吗 --接收 我是client3,大家听见我说话吗 --接收 --------------客户端2--------------- 我是client1,大家听见我说话吗 --接收 我是client2,大家听见我说话吗 --发送 我是client3,大家听见我说话吗 --接收 --------------客户端3--------------- 我是client1,大家听见我说话吗 --接收 我是client2,大家听见我说话吗 --接收 我是client3,大家听见我说话吗 --发送