19年面试的时候面试官叫我手写生产者/消费者模型,当时没有写出来。昨天刚好在《Thinking in Java》里看到讲这个,在这分享一下。

通过锁 wait()notify() 来实现。

生产者(Producer)和消费者(Consumer)同时关注产品(Product)的状态。即,有和没有。(这个实例只有一个生产者和一个消费者)。

当没有产品时

  • 生产者生产产品,并通知消费者消费(改变产品状态)
  • 消费者等待被通知

当有产品时

  • 生产者等待被通知
  • 消费者消费产品,并通知消费者 (改变产品状态)

相互通知来协调。

通过阻塞队列 (BlockingQueue) 来实现

有一个有限长度的产品(Product)队列,生产者(Producer)给队列加产品,消费者(Consumer)从队列取产品。

  • 队列满时生产者被阻塞
  • 队列空时消费者被阻塞

通过队列的阻塞来协调

阻塞队列对客户屏蔽了协同相关的细节,大大简化生产者和消费者实现复杂性。

Code

wait() and notify()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ProducerConsumer {

static class Product {
static long idn = 0;

private final long id;

public Product() {
id = idn++;
}

@Override
public String toString() {
return "Product: " + id;
}
}

class Producer implements Runnable {

@Override
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
while (product != null) {
wait();
}
}

product = new Product();
System.out.println("Produce " + product);

synchronized (consumer) {
consumer.notifyAll();
}
}
} catch (InterruptedException e) {
System.out.println("Producer interrupted");
}
}
}

class Consumer implements Runnable {

@Override
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
while (product == null) {
wait();
}
}

System.out.println("Consume " + product);
product = null;

synchronized (producer) {
producer.notifyAll();
}
}
} catch (InterruptedException e) {
System.out.println("Consumer interrupted");
}
}
}

private Product product;
private final Producer producer;
private final Consumer consumer;

ExecutorService executorService = Executors.newCachedThreadPool();

public ProducerConsumer() {
producer = new Producer();
consumer = new Consumer();

executorService.execute(producer);
executorService.execute(consumer);
}

public static void main(String[] args) {
new ProducerConsumer();
}
}

BlockingQueue:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducerConsumer {

static class Product {
static long idn = 0;

private final long id;

public Product() {
id = idn++;
}

@Override
public String toString() {
return "Product: " + id;
}
}

class Producer implements Runnable {

@Override
public void run() {
while (!Thread.interrupted()) {
try {
Product product = new Product();
System.out.println("Consume " + product);
blockingQueue.put(product);
}catch (InterruptedException e) {
System.out.println("Producer interrupted");
}
}
}
}

class Consumer implements Runnable {

@Override
public void run() {
try {
while (!Thread.interrupted()) {
Product product = blockingQueue.take();
System.out.println("Consume " + product);
}
} catch (InterruptedException e) {
System.out.println("Consumer interrupted");
}
}
}

private final BlockingQueue<Product> blockingQueue = new LinkedBlockingQueue<>(1);

ExecutorService executorService = Executors.newCachedThreadPool();

public ProducerConsumer() {
Producer producer = new Producer();
Consumer consumer = new Consumer();

executorService.execute(producer);
executorService.execute(consumer);
}

public static void main(String[] args) {
new ProducerConsumer();
}
}

Output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
> Task :ProducerConsumer.main()
Produce Product: 0
Consume Product: 0
Produce Product: 1
Consume Product: 1
Produce Product: 2
Consume Product: 2
Produce Product: 3
Consume Product: 3
Produce Product: 4
Consume Product: 4
Produce Product: 5
Consume Product: 5
Produce Product: 6
Consume Product: 6
Produce Product: 7
Consume Product: 7
Produce Product: 8
Consume Product: 8
...