A 直接给 B 发送数据,就是耦合性比较强,开发 A 的时候就得考虑 B 是如何接收的,开发 B 的时候就得考虑 A 是如何发送的。极端情况下 A 出现问题挂了 可以能也造成 B 出现问题导致 B 也挂了,反之 B 出现了问题,也会牵连 A 导致 A 挂了。
于是在阻塞队列的影响下,A 和 B 不再直接交互
开发阶段:A 只用考虑自己和队列如何交互,B 也只用考虑自己和队列如何交互,A 和 B 之间都不需要知道对方的存在。
部署阶段:A 如果挂了,对 B 没有任何影响;B 如果挂了,对 A 没有任何影响。
- 能够做到 “削峰填谷” ,提高整个系统抗风险能力。
程序猿无法控制外网有多少个用户在访问 A,当出现极端情况,外网访问请求大量涌入的时候,A 把所有请求的数据一并转让给 B 的时候,B 就容易扛不住而挂掉。在阻塞队列的影响下
多出来的压力队列承担了,队列里多存一会儿数据就行了,即使 A 的压力比较大,B 仍按照固定的频率来取数据。
标准库中的阻塞队列
在 Java 标准库中内置了阻塞队列. 如果我们需要在一些程序中使用阻塞队列, 直接使用标准库中的即可
- BlockingQueue 是一个接口. 真正实现的类是 LinkedBlockingQueue.
- put 方法用于阻塞式的入队列, take 用于阻塞式的出队列.
- BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性.
生产者消费者模型:
public class Demo20 { public static void main(String[] args) { BlockingDeque<Integer> queue = new LinkedBlockingDeque<>(); Thread customer = new Thread(()->{ while (true){ try { int value = queue.take(); System.out.println("消费元素:" + value); } catch (InterruptedException e) { e.printStackTrace(); } } }); customer.start(); Thread producer = new Thread(()->{ int n = 0; while (true){ try { System.out.println("生产元素:" + n); queue.put(n); n++; Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } }); producer.start(); }}
运行结果演示图:
阻塞队列实现:
- 自己模拟实现一个阻塞队列
- 基于数组的方式来实现队列
- 两个核心方法:1. put 入队列; 2. take 出队列
class MyBlockingQueue { // 假定最大是 1000 个元素,当然也可以设定成可配置的 private int[] items = new int[1000]; //队首的位置 private int head = 0; //队尾的位置 private int tail = 0; //队列的元素个数 private int size = 0; //入队列 public void put (int value) throws InterruptedException { synchronized (this) { while (size == items.length) { //队列已满,继续等待 this.wait(); } items[tail] = value; tail++; if (tail == items.length) { //注意 如果 tail 到达数组末尾,就需要从头开始 tail = 0; } size++; //即使没人在等待,多调用几次 notify 也没事,没负面影响 this.notify(); } } //出队列 public Integer take() throws InterruptedException { int ret = 0; synchronized (this) { while (size == 0) { //队列为空,就等待 this.wait(); } ret = items[head]; head++; if (head == items.length) { head = 0; } size--; this.notify(); } return ret; }}public class Demo21 { public static void main(String[] args) throws InterruptedException { MyBlockingQueue queue = new MyBlockingQueue(); queue.put(100); queue.take(); }}
- 入队列中的 wait 和出队列中的 notify 对应,满了之后,入队列就要阻塞等待,此时在取走元素之后,就可以尝试唤醒了。
- 入队列中的 notify 和出队列中的 wait 对应,队列为空,也要阻塞,此时在插入成功之后,队列就不为空了,就能够把 take 的等待唤醒。
- 一个线程中无法做到又等待又唤醒
- 阻塞之后,就要唤醒,阻塞和唤醒之间是沧海桑田,虽然按照当下代码是有元素插入成功了,条件不成立,等待结束。但是更稳妥的做法是把 if 换成 while ,在唤醒之后,再判断一次条件!万一条件又成立了呢?万一接下来要继续阻塞等待呢?
测试代码:
public class Demo21 { public static void main(String[] args) { MyBlockingQueue queue = new MyBlockingQueue(); Thread customer = new Thread(()->{ while (true){ int value = 0; try { value = queue.take(); System.out.println("消费:" + value); Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } }); customer.start(); Thread producer = new Thread(()->{ int value = 0; while (true){ try { queue.put(value); System.out.println("生产:" + value); value++; } catch (InterruptedException e) { e.printStackTrace(); } } }); producer.start(); }}
延缓了消费代码,也可以把生产代码延缓,调用 sleep 即可
- 延缓消费代码
- 延缓生产代码