wait与notify
# wait与notify
wait()和notify()是Object的成员函数。
- wait()/wait(long timeout):导致当前线程等待,直到另一个线程调用该对象的 notify()方法或 notifyAll()方法。
- notiy():唤醒正在等待对象监视器的单个线程。
- notifyAll():唤醒正在等待对象监视器的所有线程。
# 生产者−消费者模型
生产者-消费者模型是一个常见的多线程编程模型,,弄懂生产者消费者问题能够让我们对多线程编程的理解更加深刻。所谓生产消费者问题,实际上主要是包含了两类线程:
- 一类是生产者线程用于生产数据
- 一类是消费者线程用于消费数据
为了耦合生产者和消费者的关系,通常会采用共享的数据区域,就像一个仓库,生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为;消费者只需要从共享数据区中获取数据,并不需要关心生产者的行为
如下图所示:
一个内存队列,多个生产者线程往内存队列中放数据;多个消费者线程从内存队列中取数据。要实现这样一个编程模型,需要做下面几件事情:
- Setp1加锁,内存队列本身要加锁,才能实现线程安全。
- Setp2阻塞:当内存队列满了,生产者放不进去时,会被阻塞;当内存队列是空的时候,消费者无事可做,会被阻塞。
- Setp3双向通知:消费者被阻塞之后,生产者放入新数据,要notify()消费者;反之,生产者被阻塞之后,消费者消费了数据,要notify()生产者。
Setp1必须要做,Setp2、3不一定要做。例如,可以采取一个简单的办法,生产者放不进去之后,睡眠几百毫秒再重试,消费者取不到数据之后,睡眠几百毫秒再重试。但这个办法效率低下,也不实时。所以,我们只讨论如何阻塞、如何通知的问题。
# 如何阻塞
- 方法1:线程自己阻塞自己,也就是生产者、消费者线程各自调用wait()和notify()。
- 方法2:用一个阻塞队列,当取不到或者放不进去数据的时候,入队/出队函数本身就是阻塞的。
# 如何双向通知
- 方法1:wait()与notify()机制。
- 方法2:Condition机制。
# wait()与notify()实现
这里使用wait()与notify()机制实现,以及采用阻塞队列方式实现
# 单生产者单消费者
资源类
public class MyQueue {
// private Object lock = new Object();
private String[] data = new String[10];
// 下一条要存储记录的下标
private int putIndex = 0;
// 下一条要获取的元素下标
private int getIndex = 0;
// data中的元素个数
private int size = 0;
public synchronized void put(String element) {
if (size == data.length) {
try {
// 阻塞,等待
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
data[putIndex] = element;
++putIndex;
if (putIndex == data.length) putIndex = 0;
++size;
// 唤醒等待的消费者线程
notify();
}
public synchronized String get() {
if (size == 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
String result = data[getIndex];
++getIndex;
if (getIndex == data.length) getIndex = 0;
--size;
// 唤醒生产者生产。因为对象锁是当前对象,this
notify();
return result;
}
}
生产者线程
public class ProducerThread extends Thread {
private final MyQueue myQueue;
private int index = 0;
private final Random random = new Random();
public ProducerThread(MyQueue myQueue) {
this.myQueue = myQueue;
}
@Override
public void run() {
while (true) {
String tmp = "生产数据:" + index;
myQueue.put(tmp);
System.out.println(tmp);
index++;
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
消费者线程
public class ConsumerThread extends Thread {
private final MyQueue myQueue;
private final Random random = new Random();
public ConsumerThread(MyQueue myQueue) {
this.myQueue = myQueue;
}
@Override
public void run() {
while (true) {
String s = myQueue.get();
System.out.println("\t\t消费的数据:" + s);
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
主函数
public class Main {
public static void main(String[] args) throws InterruptedException {
MyQueue myQueue = new MyQueue();
new ConsumerThread(myQueue).start();
new ProducerThread(myQueue).start();
Thread.sleep(10000);
System.exit(0);
}
}
# 多生产者多消费者
资源类
public class MyQueue2 extends MyQueue {
private String[] data = new String[10];
// 下一条要存储记录的下标
private int putIndex = 0;
// 下一条要获取的元素下标
private int getIndex = 0;
// data中的元素个数
private int size = 0;
@Override
public synchronized void put(String element) {
if (size == data.length) {
try {
// 阻塞,等待
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 利用迭代,进行第二次抢对象锁
put(element);
} else {
put0(element);
}
}
private void put0(String element) {
data[putIndex] = element;
++size;
++putIndex;
if (putIndex == data.length) putIndex = 0;
// 唤醒等待的消费者线程
notify();
}
@Override
public synchronized String get() {
if (size == 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 利用迭代,重新获取共享锁
return get();
} else {
return get0();
}
}
private String get0() {
String result = data[getIndex];
++getIndex;
if (getIndex == data.length) getIndex = 0;
--size;
// 唤醒生产者生产。因为对象锁是当前对象,this
notify();
return result;
}
}
生产者、消费者线程同上(略)
主函数
public class Main2 {
public static void main(String[] args) throws InterruptedException {
MyQueue2 myQueue = new MyQueue2();
for (int i = 0; i < 5; i++) {
new ConsumerThread(myQueue).start();
new ProducerThread(myQueue).start();
}
Thread.sleep(10000);
System.exit(0);
}
}
# Object的实例函数
在Java里面,wait()和notify()是Object的成员函数。为什么Java要把wait()和notify()放在如此基础的类里面,而不是作为像Thread一类的成员函数,或者其他类的成员函数呢?
先看为什么wait()和notify()必须和synchronized一起使用?请看下面的代码:
class MyClass1 {
private Object obj1 = new Object();
public void method1() {
synchronized(obj1) {
//...
obj1.wait();
//...
}
}
public void method2() {
synchronized(obj1) {
//...
obj1.notify();
//...
}
}
}
等价于下面的代码
public class MyClass1 {
public void synchronized method1() {
//...
this.wait();
//...
}
public void synchronized method2() {
//...
this.notify();
//...
}
}
然后,开两个线程,线程A调用method1(),线程B调用method2()。答案已经很明显:两个线程之间要通信,对于同一个对象来说,一个线程调用该对象的wait(),另一个线程调用该对象的notify(),该对象本身就需要同步!所以,在调用wait()、notify()之前,要先通过synchronized关键字同步给对象,也就是给该对象加锁。
因此,synchronized关键字可以加在任何对象的实例方法上面,任何对象都可能成为锁。因此,wait()和notify()只能放在Object里面了。
# wait与sleep的区别
- wait:释放锁,进入阻塞状态。对于Java来说,线程状态为TIMED_WAITING或WAITING
- sleep:不释放锁,休息一会。对于Java来说,线程状态为TIMED_WAITING。
为什么wait必须释放锁?
假如有两个线程A、B,当线程A进入synchronized(obj1)中之后,也就是对obj1上了锁。此时,调用wait()进入阻塞状态,假如一直不能退出synchronized代码块;那么,线程B永远无法进入synchronized(obj1)同步块里,也就永远没有机会调用notify(),发生死锁。
public final native void wait(long timeout) throws InterruptedException;
这就涉及一个关键的问题:在wait()的内部,会先释放锁obj1,然后进入阻塞状态,之后,它被另外一个线程用notify()唤醒,重新获取锁!其次,wait()调用完成后,执行后面的业务逻辑代码,然后退出synchronized同步块,再次释放锁。
wait()内部的伪代码如下:
wait() {
// 释放锁, 避免死锁
// 阻塞,等待被其他线程notify
// 重新获取锁
}
# wait与notify的问题
以上述的生产者-消费者模型来看,其伪代码大致如下:
public void enqueue() {
synchronized(queue) {
while (queue.full()) {
queue.wait();
}
//... 数据入列
// 通知消费者,队列中有数据了。
queue.notify();
}
}
public void dequeue() {
synchronized(queue) {
while (queue.empty()) {
queue.wait();
}
// ...数据出队列
// 通知生产者,队列中有空间了,可以继续放数据了。
queue.notify();
}
}
生产者在通知消费者的同时,也通知了其他的生产者;消费者在通知生产者的同时,也通知了其他消费者。
原因在于wait()和notify()所作用的对象和synchronized所作用的**(锁)对象是同一个**,只能有一个对象,无法区分队列空和列队满两个条件。这正是后面JDK 5引入Condition
要解决的问题。