19年面试的时候面试官叫我手写生产者/消费者模型,当时没有写出来。昨天刚好在《Thinking in Java》里看到讲这个,在这分享一下。
通过锁 wait()
和 notify()
来实现。
生产者(Producer)和消费者(Consumer)同时关注产品(Product)的状态。即,有和没有。(这个实例只有一个生产者和一个消费者)。
当没有产品时
- 生产者生产产品,并通知消费者消费(改变产品状态)
- 消费者等待被通知
当有产品时
- 生产者等待被通知
- 消费者消费产品,并通知消费者 (改变产品状态)
相互通知来协调。
通过阻塞队列 (BlockingQueue) 来实现
有一个有限长度的产品(Product)队列,生产者(Producer)给队列加产品,消费者(Consumer)从队列取产品。
通过队列的阻塞来协调
阻塞队列对客户屏蔽了协同相关的细节,大大简化生产者和消费者实现复杂性。
Code
wait()
and notify()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class ProducerConsumer {
static class Product { static long idn = 0;
private final long id;
public Product() { id = idn++; }
@Override public String toString() { return "Product: " + id; } }
class Producer implements Runnable {
@Override public void run() { try { while (!Thread.interrupted()) { synchronized (this) { while (product != null) { wait(); } }
product = new Product(); System.out.println("Produce " + product);
synchronized (consumer) { consumer.notifyAll(); } } } catch (InterruptedException e) { System.out.println("Producer interrupted"); } } }
class Consumer implements Runnable {
@Override public void run() { try { while (!Thread.interrupted()) { synchronized (this) { while (product == null) { wait(); } } System.out.println("Consume " + product); product = null;
synchronized (producer) { producer.notifyAll(); } } } catch (InterruptedException e) { System.out.println("Consumer interrupted"); } } }
private Product product; private final Producer producer; private final Consumer consumer;
ExecutorService executorService = Executors.newCachedThreadPool();
public ProducerConsumer() { producer = new Producer(); consumer = new Consumer();
executorService.execute(producer); executorService.execute(consumer); }
public static void main(String[] args) { new ProducerConsumer(); } }
|
BlockingQueue:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue;
public class ProducerConsumer {
static class Product { static long idn = 0;
private final long id;
public Product() { id = idn++; }
@Override public String toString() { return "Product: " + id; } }
class Producer implements Runnable {
@Override public void run() { while (!Thread.interrupted()) { try { Product product = new Product(); System.out.println("Consume " + product); blockingQueue.put(product); }catch (InterruptedException e) { System.out.println("Producer interrupted"); } } } }
class Consumer implements Runnable {
@Override public void run() { try { while (!Thread.interrupted()) { Product product = blockingQueue.take(); System.out.println("Consume " + product); } } catch (InterruptedException e) { System.out.println("Consumer interrupted"); } } }
private final BlockingQueue<Product> blockingQueue = new LinkedBlockingQueue<>(1);
ExecutorService executorService = Executors.newCachedThreadPool();
public ProducerConsumer() { Producer producer = new Producer(); Consumer consumer = new Consumer();
executorService.execute(producer); executorService.execute(consumer); }
public static void main(String[] args) { new ProducerConsumer(); } }
|
Output:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| > Task :ProducerConsumer.main() Produce Product: 0 Consume Product: 0 Produce Product: 1 Consume Product: 1 Produce Product: 2 Consume Product: 2 Produce Product: 3 Consume Product: 3 Produce Product: 4 Consume Product: 4 Produce Product: 5 Consume Product: 5 Produce Product: 6 Consume Product: 6 Produce Product: 7 Consume Product: 7 Produce Product: 8 Consume Product: 8 ...
|