0

《Java-SE-第二十四章》之线程间协作

 1 year ago
source link: https://blog.51cto.com/u_15454299/6990420
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

在你立足处深挖下去,就会有泉水涌出!别管蒙昧者们叫嚷:“下边永远是地狱!”

博客主页:KC老衲爱尼姑的博客主页

 博主的github,平常所写代码皆在于此

共勉:talk is cheap, show me the code

作者是爪哇岛的新手,水平很有限,如果发现错误,一定要及时告知作者哦!感谢感谢!


  • 线程之间的协作
  • wait()
  • notify()方法
  • notifyAll()方法
  • wait 和 sleep 的对比(面试题)
  • 阻塞式队列
  • 阻塞队列是什么?
  • 标准库中阻塞队列类
  • 生产者-消费者模型
  • 为什么需要使用生产者-消费者模型
  • 生产者-消费者模型特点
  • 生产者-消费者模型作用
  • 基于BlockingQueue 实现生产者-消费者模型
  • 模拟阻塞队列
  • 基于模拟阻塞队列实现生产者-消费者模型
  • 任务间使用管道进行输入/输出

线程之间的协作

再次之前我们已经解决了,如果多个任务交替着步入某项共享资源,可以使用互斥来使得任何时刻只有一个任务可以访问这项资源。现在我们需要学习如何使任务彼此之间可以协作,可以达到多个任务一起工作去解决某个问题。现在的问题不是线程之间的干涉,而是线程之间的协作。线程之间的协调涉及到某些部分任务必须在其他 部分被解决之前解决。这非常像盖房子,必须先挖好房子的地基,然后同时设计好地基所需的钢结构和和水泥,而这两项任务必须在浇筑地基之前完成。水泥浇筑完之后才可以在此基础上砌墙。在这些任务中,某些可以并行执行,但是某些步骤需要所有的任务结束之后才能开动。

当线程协作时,关键的问题是这些任务之间的握手,所谓的握手可以视为一种通知机制。为了实现这种握手,依旧需要使用到互斥,在多线程环境下,互斥能保证只有一个线程可以响应某个信号,这样就可以避免多个线程之间的竞争。在互斥的基础上,我们为线程添加了一种新途径,可以将自身挂起,直到某些外部条件发生变化时,表示是时候这个线程可以干活了。这种握手可以通过Object的方法wait()和notify()来安全地实现。

wait()

wait()使得线程可以等待某个条件发生变化,而自身是无法改变这个条件。通常,这种条件将由另一个任务来改变。你肯定不想你的线程不断测试这个任务,不断的进行空循环,这个被称为忙等,通常是一种不良好的CPU周期使用方式。这就好比张三的舍友率先进入了厕所,巧了此时张三也想上厕所,张三就不断在敲门说:“你好了没”。因此wait()方法会在等待外界条件的时候会将任务挂起,并且只有在notify()或notifyAll()触发时,即表示发生某些感感兴趣的事物,这个线程才会被唤醒去检查所产生的变化。这个通知就像,舍友告诉张三我已经解决了,你可以进去了。wait通常搭配synchronized使用,脱离synchronized使用wait会直接抛出异常。所以使用wait首先得获取锁,然后使当前执行代码的线程进行等待,然后释放锁,当满足条件时会被唤醒,重新尝试获取锁。

wait 结束等待的条件:

  1. 其他线程调用该对象的 notify 方法.
  2. wait 等待时间超时 (wait 方法提供一个带有 timeout 参数的版本, 来指定等待时间).
  3. 其他线程调用该等待线程的 interrupted 方法, 导致 wait 抛出 InterruptedException 异常

代码示例: 观察wait()方法使用

public class WaitTask implements Runnable{
    private Object lock;

    public WaitTask(Object lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        synchronized (lock) {
            System.out.println("你好,我是:"+Thread.currentThread().getName());
            try {
                System.out.println("等待林妹妹回复");
                lock.wait();
              //lock.wait(1000);//具有时间的等待,过期不候。
                System.out.println("林妹妹回复我了");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        Object lock = new Object();
        Thread t1 = new Thread(new WaitTask(lock),"贾宝玉");
        t1.start();
    }
}

wait方法属于Object,而Object是被所有类都继承的。当我们调用的时候实际前面省略了this.wait是必须包含在同步代码块或者同步代码块中,其同步监视器的对象(锁 的对象)与this也就是当前的对象必须一致,不然会抛出IllegalMonitorStateException。

运行结果:

《Java-SE-第二十四章》之线程间协作_阻塞队列

该程序执行到wait之后就会一直等待下去,那么程序不可能一直等待下去,这个时候就该唤醒方法notify()出场 了。

IllegalMonitorStateException复现

public class WaitTask implements Runnable{
    private Object lock;

    public WaitTask(Object lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        synchronized (this) {
            System.out.println("你好,我是:"+Thread.currentThread().getName());
            try {
                System.out.println("等待林妹妹回复");
                lock.wait();
                System.out.println("林妹妹回复我了");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        Object lock = new Object();
        Thread t1 = new Thread(new WaitTask(lock),"贾宝玉");
        t1.start();
    }
}

运行结果:

《Java-SE-第二十四章》之线程间协作_开发语言_02

notify()方法

notify 方法是唤醒等待的线程,notify()所在的同步代码块或者同步方法的锁对象必须和wait方法所在的同步代码块或者同步方法的锁对象一致,不然不会唤醒。

  • 方法notify()也要在同步方法或同步块中调用,该方法是用来通知那些可能等待该对象的对象锁的其它线程,对其发出通知notify,并使它们重新获取该对象的对象锁。
  • 如果有多个线程等待,则有线程调度器随机挑选出一个呈 wait 状态的线程。(并没有 “先来后到”)
  • 在notify()方法后,当前线程不会马上释放该对象锁,要等到执行notify()方法的线程将程序执行完,也就是退出同步代码块之后才会释放对象锁。
public class NotifyTask implements Runnable {
    private Object lock;

    public NotifyTask(Object lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        synchronized (lock) {
            System.out.println("你好,我是:"+Thread.currentThread().getName());
            lock.notify();
        }
    }
    public static void main(String[] args) {
        Object lock = new Object();
        Thread t1 = new Thread(new WaitTask(lock),"贾宝玉");
        t1.start();
        Thread t2 = new Thread(new NotifyTask(lock),"林黛玉");
        t2.start();
    }
}

运行结果:

《Java-SE-第二十四章》之线程间协作_生产者-消费者_03

notifyAll()方法

notify方法只是唤醒某一个等待线程. 使用notifyAll方法可以一次唤醒所有的等待线程.

使用notifyAll()方法唤醒所有等待线程, 在上面的代码基础上做出修改,创建 3 个 WaitTask 实例. 1 个 NotifyTask 实例.。

public class NotifyTask implements Runnable {
    private Object lock;

    public NotifyTask(Object lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        synchronized (lock) {
            System.out.println("你们好,我是:"+Thread.currentThread().getName());
            lock.notifyAll();
        }
    }
    public static void main(String[] args) throws InterruptedException {
        Object lock = new Object();
        Thread t1 = new Thread(new WaitTask(lock),"贾宝玉");
        Thread t2 = new Thread(new WaitTask(lock),"妙玉");
        Thread t3 = new Thread(new WaitTask(lock),"史湘云");
        Thread t4 = new Thread(new NotifyTask(lock),"林黛玉");
        t1.start();
        t2.start();
        t3.start();
        Thread.sleep(2000);
        t4.start();
    }
}

运行结果:

《Java-SE-第二十四章》之线程间协作_数据_04

wait 和 sleep 的对比(面试题)

理论上wait和sleep没有可比性,因为一个是用于线程通信,一个是让线程阻塞一段时间。唯一的相同点就是让线程放弃执行一段时间。

在此就浅浅的总结:

  1. wait 需要搭配 synchronized 使用. sleep 不需要.
  2. wait 是 Object 的方法 sleep 是 Thread 的静态方法.

阻塞式队列

阻塞队列是什么?

阻塞队列是一种特殊的队列. 也遵守 “先进先出” 的原则,在此基础上,如果队列满的时候,继续入队列就会阻塞,到有其他线程从队列中取走元素。如果队列空的时候,继续出队列也会阻塞, 直到有其他线程往队列中插入元素。、

标准库中阻塞队列类

在 Java 标准库中内置了阻塞队列. 如果我们需要在一些程序中使用阻塞队列, 直接使用标准库中的即可. BlockingQueue 是个接口,需要使用它的实现之一来使用 BlockingQueue,java.util.concurrent 包下具有以下 BlockingQueue 接口的实现类:

JDK 提供了 7 个阻塞队列。分别是

  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列
  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列
  • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列
  • SynchronousQueue:一个不存储元素的阻塞队列
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列(实现了继承于 BlockingQueue 的 TransferQueue)
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列

BlockingQueue 主要提供四类方法,如下表所示

返回特定值

阻塞特定时间

add(e)

offer(e)

put(e)

offer(e,time,unit)

remove()

poll()

take()

poll(e,time,unit)

获取队首元素

element()

peek()

生产者-消费者模型

假设有两个线程分别是线程A和线程B,两个线程共享一个缓冲区,线程A负责往缓冲区中放入数据,线程B往缓冲区取出数据,那么这就是 生产者-消费者模型,其中线程A就是生产者,线程B就是消费者。

为什么需要使用生产者-消费者模型

在多线程环境下,如果生产者生产数据的速度足够快,而消费者消费数据的速度相对于生产者比慢,那么生产者就得等到消费者把数据消费完了再生产,因为生产者再生产数据没地方放啊!!!。同理,如果消费者消费的速度赶上了生产者生产的速度,那么消费者就经常处于等待状态。所以 为了平衡生产者和消费者之间的生产和消费数据的能力,就引入了缓冲区来存储生产者生产的数据,所以就有生产者-消费者模型。

生产者-消费者模型特点

  • 保证生产者不会在缓冲区满的时候继续向缓冲区放入数据,而消费者也不会在缓冲区空的时候,消耗数据。
  • 当缓冲区满的时候,生产者会进入等待状态,当下次消费者开始消耗缓冲区的数据时,生产者才会被唤醒,开始往缓冲区中添加数据;当缓冲区空的时候,消费者也会进入等待状态,直到生产者往缓冲区中添加数据时才会被唤醒

生产者-消费者模型作用

  1. 削峰填谷:当服务器短时间收到了大量的请求,服务器可能直接被打没了,为了避免服务器宕机,可以将请求放到一个阻塞队列中,然后再由消费者线程慢慢的来处理每个请求.
  2. 解耦:生产者不需要关心谁去消费数据,反正有人消费就行。消费者不需要关心生产数据,反正有人生产就行。

基于BlockingQueue 实现生产者-消费者模型

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

public class ProducerConsumerModel {
    private  static int count;
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> blockingQue = new LinkedBlockingDeque<>();
        Thread consumer = new Thread(() -> {
            while (true) {
                try {
                    Integer num = blockingQue.take();
                    TimeUnit.MILLISECONDS.sleep(1000);
                    count++;
                    System.out.println("消费者消费了"+count+"个数据,"+"数据是:"+num);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        consumer.start();
        Thread producer = new Thread(() -> {
            while (true) {
                Random rand = new Random();
                try {
                    Integer num = rand.nextInt();
                    blockingQue.put(num);
                    TimeUnit.MILLISECONDS.sleep(1000);
                    count++;
                    System.out.println("生产者生产了"+count+"个数据,"+"数据是:"+num);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        producer.start();
        consumer.join();
        producer.join();
    }

运行结果:

《Java-SE-第二十四章》之线程间协作_开发语言_05

模拟阻塞队列

使用循环队列以及synchronized来模拟阻塞队列

public class BlockingQueue {
    /**
     * 队列数据
     */
    private int[] elem = new int[100];
    /**
     * 队头指针
     */
    private int head;
    /**
     * 队尾指针
     */
    private int tail;
    /**
     * 队列元素个数
     */
    private int size;


    /**
     * 出队头元素
     * @return
     */
    public Integer take() throws InterruptedException {
        synchronized (this) {
            if (size == 0) {
                //队列为空
                wait();
            }
            int ret = elem[head];
            head++;
            //作用等价于 head %= elem.length
            if (head >= elem.length) {
                head = 0;
            }
            size--;
            notifyAll();
            return ret;
        }
    }

    /**
     * 入队尾元素
     * @param val
     */
    public void put(int val) throws InterruptedException {
        synchronized (this) {
            while (size == elem.length) {
                //队列满
                wait();
            }
            elem[tail++] = val;
            //作用等价于 tail %= elem.length
            if (tail >= elem.length) {
                tail = 0;
            }
            size++;
            notifyAll();
        }
    }

}

基于模拟阻塞队列实现生产者-消费者模型

import java.util.Random;

import java.util.concurrent.TimeUnit;

public class ProducerConsumerModel {
    private  static int count;
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue blockingQue = new BlockingQueue();
        Thread consumer = new Thread(() -> {
            while (true) {
                try {
                    Integer num = (Integer) blockingQue.take();
                    TimeUnit.MILLISECONDS.sleep(1000);
                    System.out.println("消费者消费了"+count+"个数据,"+"数据是:"+num);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        consumer.start();
        Thread producer = new Thread(() -> {
            while (true) {
                Random rand = new Random();
                try {
                    Integer num = rand.nextInt();
                    count++;
                    blockingQue.put(num);
                    TimeUnit.MILLISECONDS.sleep(1000);
                    System.out.println("生产者生产了"+count+"个数据,"+"数据是:"+num);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        producer.start();
        consumer.join();
        producer.join();
    }
}

运行结果:

《Java-SE-第二十四章》之线程间协作_数据_06

任务间使用管道进行输入/输出

Java 中以标准库的形式支持了对线程间的输入/输出。其中输出类库中的对应物是PipedWriter类,允许任务向管道写,输入类库中的对应物是PipedReader类,允许不同的任务从同一个管道中读取。管道基本上是一个阻塞队列,而任务间使用管道进行输入/输出,可以看做是生产者-消费者”问题的变体。

下面是一个简单例子,两个任务使用一个管道进行通信。

Sender负责向管道写数据

import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.Random;
import java.util.concurrent.TimeUnit;

public class Sender implements Runnable {
    private Random random = new Random(47);
    private PipedWriter out = new PipedWriter();
    public PipedWriter getPipedWriter() {
        return out;
    }
    @Override
    public void run() {
        try {
            while (true) {
                for (char c='A'; c <= 'Z'; c++) {
                    out.write(c);
                    TimeUnit.MILLISECONDS.sleep(random.nextInt(500));
                }
            }
        } catch (IOException e) {
            System.out.println(e+"Sender writer exception");
        } catch (InterruptedException e) {
            System.out.println(e+"Sender sleep exception");
        }
    }
}

Receiver负责向管道读数据

import java.io.IOException;
import java.io.PipedReader;

public class Receiver implements Runnable {
    private PipedReader in;

    public Receiver(Sender sender) throws IOException {
        in = new PipedReader(sender.getPipedWriter());
    }

    @Override
    public void run() {
        try {
            while (true) {
                System.out.println("Read:"+(char)in.read());
            }
        } catch (IOException e) {
            System.out.println(e+"Receiver read exception");
        }
    }
}
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class PipedIO {
    public static void main(String[] args) throws IOException, InterruptedException {
        Sender sender = new Sender();
        Receiver receiver = new Receiver(sender);
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(sender);
        exec.execute(receiver);
        TimeUnit.SECONDS.sleep(6);
        exec.shutdownNow();
    }
}

运行结果:

Read:A
Read:B
Read:C
Read:D
Read:E
Read:F
Read:G
Read:H
Read:I
Read:J
Read:K
Read:L
Read:M
Read:N
Read:O
Read:P
Read:Q
Read:R
Read:S
Read:T
Read:U
Read:V
Read:W
Read:X
Read:Y
Read:Z
java.lang.InterruptedException: sleep interruptedSender sleep exception
java.io.InterruptedIOExceptionReceiver read exception

Process finished with exit code 0


Recommend

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK