文章目录
  1. 1. 优点
  2. 2. Code
    1. 2.1. wait() 和 notify() 实现
      1. 2.1.1. notify() notifyAll()
    2. 2.2. Blocking Queue 实现

优点

  • 方便开发,开发者可以对生产者和消费者进行单独编码,它们仅仅通过共享对象通信。

  • 生产者无需关心消费者是谁,有多少消费者;消费者同理;

  • 生产者和消费者可以各自已不同速度工作,不会发生生产者还没生产完成该物品,消费者就去消费的bug;

  • 将生产者和消费者功能上分开更利于写出 整洁、易读和易管理的代码。

Code

wait() 和 notify() 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class Producer extends Thread{
private final Queue<Integer> sharedQueue;
public Producer(Queue<Integer> sharedQueue) {
super("Producer");
this.sharedQueue = sharedQueue;
}
@Override
public void run(){
for(int i = 1 ; i<5;i++){
synchronized (sharedQueue) {
while(sharedQueue.size() > 0){
try {
System.out.println("Queue is full, waiting!");
sharedQueue.wait();
} catch (Exception e) {
e.printStackTrace();
}
}//while
System.out.println("producing : " + i);
sharedQueue.add(i);
sharedQueue.notify();
}//synchronized
}
}
}
class Counsumer extends Thread{
private final Queue<Integer> sharedQueue;
public Counsumer(Queue<Integer> sharedQueue) {
super("Counsumer");
this.sharedQueue = sharedQueue;
}
@Override
public void run(){
while(true){
synchronized (sharedQueue) {
while(sharedQueue.size() == 0){
try {
System.out.println("Queue is empty, waiting~~");
sharedQueue.wait();
} catch (Exception e) {
e.printStackTrace();
}
}
int number = sharedQueue.remove();
System.out.println("consuming : " + number);
sharedQueue.notify();
//termination condition
if(number == 3){break; }
}
}
}
}
public class Main {
/**
* @param args
*/
public static void main(String[] args) {
Queue<Integer> sharedQueue = new LinkedList<>();
new Producer(sharedQueue).start();
new Counsumer(sharedQueue).start();
}
}
/*
* output:
* producing : 1
* Queue is full, waiting!
* consuming : 1
* Queue is empty, waiting~~
* producing : 2
* Queue is full, waiting!
* consuming : 2
* Queue is empty, waiting~~
* producing : 3
* Queue is full, waiting!
* consuming : 3
* producing : 4
*/

notify() notifyAll()

notifyAll使所有原来在该对象上等待被notify的线程统统退出wait的状态,变成等待该对象上的锁,一旦该对象被解锁,他们就会去竞争。

notify则文明得多他只是选择一个wait状态线程进行通知,并使它获得该对象上的锁,但不惊动其他同样在等待被该对象notify的线程们,当第一个线程运行完毕以后释放对象上的锁此时如果该对象没有再次使用notify语句,则即便该对象已经空闲,其他wait状态等待的线程。
由于没有得到该对象的通知,继续处在wait状态,直到这个对象发出一个notify或notifyAll,它们等待的是被notify或notifyAll,而不是锁。

Blocking Queue 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/*
BlockingQuue is an interface and Java 5 provides different implantation like ArrayBlockingQueue and LinkedBlockingQueue.
BlockingQueue put() method will block if Queue is full in case of Bounded Queue and take() will block if Queue is empty.
*/
// put() and take()
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
class Producer implements Runnable{
private final BlockingQueue<Integer> sharedQueue;
public Producer(BlockingQueue<Integer> sharedQueue) {
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
for(int i = 1 ;i < 5;i++ ){
try {
System.out.println("producing : " + i);
sharedQueue.put(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable{
private final BlockingQueue<Integer> sharedQueue;
public Consumer(BlockingQueue<Integer> sharedQueue) {
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
while(true){
try {
int number = sharedQueue.take();
System.out.println("consuming : " + number);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Main {
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<>();
new Thread(new Producer(sharedQueue)).start();
new Thread(new Consumer(sharedQueue)).start();
}
}
/*
* output:
* producing : 1
* producing : 2
* consuming : 1
* consuming : 2
* producing : 3
* producing : 4
* consuming : 3
* consuming : 4
*/
文章目录
  1. 1. 优点
  2. 2. Code
    1. 2.1. wait() 和 notify() 实现
      1. 2.1.1. notify() notifyAll()
    2. 2.2. Blocking Queue 实现