万隆的笔记 万隆的笔记
博文索引
笔试面试
  • 在线学站

    • 菜鸟教程 (opens new window)
    • 入门教程 (opens new window)
    • Coursera (opens new window)
  • 在线文档

    • w3school (opens new window)
    • Bootstrap (opens new window)
    • Vue (opens new window)
    • 阿里开发者藏经阁 (opens new window)
  • 在线工具

    • tool 工具集 (opens new window)
    • bejson 工具集 (opens new window)
    • 文档转换 (opens new window)
  • 更多在线资源
  • Changlog
  • Aboutme
GitHub (opens new window)
博文索引
笔试面试
  • 在线学站

    • 菜鸟教程 (opens new window)
    • 入门教程 (opens new window)
    • Coursera (opens new window)
  • 在线文档

    • w3school (opens new window)
    • Bootstrap (opens new window)
    • Vue (opens new window)
    • 阿里开发者藏经阁 (opens new window)
  • 在线工具

    • tool 工具集 (opens new window)
    • bejson 工具集 (opens new window)
    • 文档转换 (opens new window)
  • 更多在线资源
  • Changlog
  • Aboutme
GitHub (opens new window)
  • JUC介绍
  • 并发编程核心概念与主要内容
  • Java线程创建与使用
  • 线程生命周期
  • synchronized关键字
  • wait与notify
    • 生产者−消费者模型
    • Object的实例函数
    • wait与sleep的区别
    • wait与notify的问题
  • 线程中断-interrupt
  • 线程优雅关闭
  • JMM内存模型
  • volatile关键字
  • final关键字
  • Lock
  • JUC并发编程
2022-05-06
目录

wait与notify

# wait与notify

wait()和notify()是Object的成员函数。

  • wait()/wait(long timeout):导致当前线程等待,直到另一个线程调用该对象的 notify()方法或 notifyAll()方法。
  • notiy():唤醒正在等待对象监视器的单个线程。
  • notifyAll():唤醒正在等待对象监视器的所有线程。

# 生产者−消费者模型

生产者-消费者模型是一个常见的多线程编程模型,,弄懂生产者消费者问题能够让我们对多线程编程的理解更加深刻。所谓生产消费者问题,实际上主要是包含了两类线程:

  • 一类是生产者线程用于生产数据
  • 一类是消费者线程用于消费数据

为了耦合生产者和消费者的关系,通常会采用共享的数据区域,就像一个仓库,生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为;消费者只需要从共享数据区中获取数据,并不需要关心生产者的行为

如下图所示:

prod_consumers.png

一个内存队列,多个生产者线程往内存队列中放数据;多个消费者线程从内存队列中取数据。要实现这样一个编程模型,需要做下面几件事情:

  • Setp1加锁,内存队列本身要加锁,才能实现线程安全。
  • Setp2阻塞:当内存队列满了,生产者放不进去时,会被阻塞;当内存队列是空的时候,消费者无事可做,会被阻塞。
  • Setp3双向通知:消费者被阻塞之后,生产者放入新数据,要notify()消费者;反之,生产者被阻塞之后,消费者消费了数据,要notify()生产者。

Setp1必须要做,Setp2、3不一定要做。例如,可以采取一个简单的办法,生产者放不进去之后,睡眠几百毫秒再重试,消费者取不到数据之后,睡眠几百毫秒再重试。但这个办法效率低下,也不实时。所以,我们只讨论如何阻塞、如何通知的问题。

# 如何阻塞

  • 方法1:线程自己阻塞自己,也就是生产者、消费者线程各自调用wait()和notify()。
  • 方法2:用一个阻塞队列,当取不到或者放不进去数据的时候,入队/出队函数本身就是阻塞的。

# 如何双向通知

  • 方法1:wait()与notify()机制。
  • 方法2:Condition机制。

# wait()与notify()实现

这里使用wait()与notify()机制实现,以及采用阻塞队列方式实现

# 单生产者单消费者

资源类

public class MyQueue {

//    private Object lock = new Object();

    private String[] data = new String[10];
    // 下一条要存储记录的下标
    private int putIndex = 0;
    // 下一条要获取的元素下标
    private int getIndex = 0;
    // data中的元素个数
    private int size = 0;


    public synchronized void put(String element) {
        if (size == data.length) {
            try {
                // 阻塞,等待
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        data[putIndex] = element;
        ++putIndex;
        if (putIndex == data.length) putIndex = 0;
        ++size;
        // 唤醒等待的消费者线程
        notify();
    }

    public synchronized String get() {
        if (size == 0) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        String result = data[getIndex];
        ++getIndex;
        if (getIndex == data.length) getIndex = 0;
        --size;
        // 唤醒生产者生产。因为对象锁是当前对象,this
        notify();
        return result;
    }

}


生产者线程

public class ProducerThread extends Thread {

    private final MyQueue myQueue;
    private int index = 0;
    private final Random random = new Random();

    public ProducerThread(MyQueue myQueue) {
        this.myQueue = myQueue;
    }

    @Override
    public void run() {
        while (true) {
            String tmp = "生产数据:" + index;
            myQueue.put(tmp);
            System.out.println(tmp);
            index++;
            try {
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

消费者线程

public class ConsumerThread extends Thread {

    private final MyQueue myQueue;
    private final Random random = new Random();

    public ConsumerThread(MyQueue myQueue) {
        this.myQueue = myQueue;
    }

    @Override
    public void run() {
        while (true) {
            String s = myQueue.get();
            System.out.println("\t\t消费的数据:" + s);
            try {
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

主函数

public class Main {
    public static void main(String[] args) throws InterruptedException {
        MyQueue myQueue = new MyQueue();
        new ConsumerThread(myQueue).start();
        new ProducerThread(myQueue).start();

        Thread.sleep(10000);
        System.exit(0);
    }
}

# 多生产者多消费者

资源类

public class MyQueue2 extends MyQueue {

    private String[] data = new String[10];
    // 下一条要存储记录的下标
    private int putIndex = 0;
    // 下一条要获取的元素下标
    private int getIndex = 0;
    // data中的元素个数
    private int size = 0;


    @Override
    public synchronized void put(String element) {
        if (size == data.length) {
            try {
                // 阻塞,等待
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 利用迭代,进行第二次抢对象锁
            put(element);
        } else {
            put0(element);
        }

    }

    private void put0(String element) {
        data[putIndex] = element;
        ++size;
        ++putIndex;
        if (putIndex == data.length) putIndex = 0;
        // 唤醒等待的消费者线程
        notify();
    }

    @Override
    public synchronized String get() {
        if (size == 0) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 利用迭代,重新获取共享锁
            return get();
        } else {
            return get0();
        }
    }

    private String get0() {
        String result = data[getIndex];
        ++getIndex;
        if (getIndex == data.length) getIndex = 0;
        --size;
        // 唤醒生产者生产。因为对象锁是当前对象,this
        notify();
        return result;
    }

}

生产者、消费者线程同上(略)

主函数

public class Main2 {
    public static void main(String[] args) throws InterruptedException {
        MyQueue2 myQueue = new MyQueue2();
        for (int i = 0; i < 5; i++) {
            new ConsumerThread(myQueue).start();
            new ProducerThread(myQueue).start();
        }

        Thread.sleep(10000);
        System.exit(0);
    }
}

# Object的实例函数

在Java里面,wait()和notify()是Object的成员函数。为什么Java要把wait()和notify()放在如此基础的类里面,而不是作为像Thread一类的成员函数,或者其他类的成员函数呢?

先看为什么wait()和notify()必须和synchronized一起使用?请看下面的代码:

class MyClass1 { 
  private Object obj1 = new Object(); 
  public void method1() { 
    synchronized(obj1) { 
      //... 
      obj1.wait(); 
      //... 
    } 
  }
  public void method2() { 
    synchronized(obj1) { 
      //... 
      obj1.notify(); 
      //... 
    } 
  } 
}

等价于下面的代码

public class MyClass1 { 
  public void synchronized method1() { 
    //... 
    this.wait(); 
    //... 
  }
  public void synchronized method2() { 
    //... 
    this.notify(); 
    //... 
  } 
}

然后,开两个线程,线程A调用method1(),线程B调用method2()。答案已经很明显:两个线程之间要通信,对于同一个对象来说,一个线程调用该对象的wait(),另一个线程调用该对象的notify(),该对象本身就需要同步!所以,在调用wait()、notify()之前,要先通过synchronized关键字同步给对象,也就是给该对象加锁。

因此,synchronized关键字可以加在任何对象的实例方法上面,任何对象都可能成为锁。因此,wait()和notify()只能放在Object里面了。

# wait与sleep的区别

  • wait:释放锁,进入阻塞状态。对于Java来说,线程状态为TIMED_WAITING或WAITING
  • sleep:不释放锁,休息一会。对于Java来说,线程状态为TIMED_WAITING。

为什么wait必须释放锁?

假如有两个线程A、B,当线程A进入synchronized(obj1)中之后,也就是对obj1上了锁。此时,调用wait()进入阻塞状态,假如一直不能退出synchronized代码块;那么,线程B永远无法进入synchronized(obj1)同步块里,也就永远没有机会调用notify(),发生死锁。

public final native void wait(long timeout) throws InterruptedException;

这就涉及一个关键的问题:在wait()的内部,会先释放锁obj1,然后进入阻塞状态,之后,它被另外一个线程用notify()唤醒,重新获取锁!其次,wait()调用完成后,执行后面的业务逻辑代码,然后退出synchronized同步块,再次释放锁。

wait()内部的伪代码如下:

wait() { 
  // 释放锁, 避免死锁
  // 阻塞,等待被其他线程notify 
  // 重新获取锁 
} 

# wait与notify的问题

以上述的生产者-消费者模型来看,其伪代码大致如下:

public void enqueue() { 
  synchronized(queue) { 
    while (queue.full()) { 
      queue.wait();
    }
    //... 数据入列 
    
    // 通知消费者,队列中有数据了。
    queue.notify(); 
  } 
}

public void dequeue() { 
  synchronized(queue) { 
    while (queue.empty()) { 
      queue.wait(); 
    }
    // ...数据出队列 
    // 通知生产者,队列中有空间了,可以继续放数据了。
    queue.notify();  
  } 
}

生产者在通知消费者的同时,也通知了其他的生产者;消费者在通知生产者的同时,也通知了其他消费者。

原因在于wait()和notify()所作用的对象和synchronized所作用的**(锁)对象是同一个**,只能有一个对象,无法区分队列空和列队满两个条件。这正是后面JDK 5引入Condition要解决的问题。

上次更新: 5/30/2023, 12:05:21 AM
线程中断-interrupt

线程中断-interrupt→

最近更新
01
2025
01-15
02
Elasticsearch面试题
07-17
03
Elasticsearch进阶
07-16
更多文章>
Theme by Vdoing
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式