3

Java基础笔记(三) 多线程

 2 years ago
source link: https://songlee24.github.io/2016/03/19/java-basic-note-3/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

本文主要是我在看《疯狂Java讲义》时的读书笔记,阅读的比较仓促,就用 markdown 写了个概要。

第十章 多线程

1、进程和线程的概念

当一个程序进入内存运行时,即变成一个进程(Process)。进程是系统进行资源分配和调度的一个独立单位。—— 进程三特征:

  • 独立性:进程是系统中独立存在的实体,拥有自己独立的资源和地址空间;
  • 动态性:进程与程序的区别在于,程序只是一个静态的指令集合,而进程是一个正在系统中执行的指令集合,进程具有自己的生命周期和状态;
  • 并发性:多个进程可以在单个处理器上并发执行,互不影响。(与并行不是一个概念)

线程(Thread)是进程的执行单元,独立、并发的执行流,被称为轻量级进程。

  • 线程可以拥有自己的堆栈、自己的程序计数器、自己的局部变量,但不拥有系统资源
  • 一个进程中可以有多个线程,它们共享该进程的全部资源
  • 多个线程并发地执行

2、线程的创建

>> 继承Thread类

定义Thread类的子类,并重写该类的run()方法。

public class MyThread extends Thread {
private int i;

@Override
public void run() {
for( ;i<100;++i) {
System.out.println(getName()+" "+i);
}
}

public static void main(String[] args) {
new MyThread().start(); // 注意三个线程并不会共享实例变量i
new MyThread().start();
new MyThread().start();
}
}

>> 实现Runnable接口

定义一个类实现Runnable接口,并重写该接口的run()方法。(将该类对象作为Thread对象的target)

public class MyTarget implements Runnable {
private int i;

@Override
public void run() {
for( ;i<100;++i) {
System.out.println(Thread.currentThread().getName()+" "+i);
}
}

public static void main(String[] args) {
MyTarget target = new MyTarget();
new Thread(target, "新线程1").start();
new Thread(target, "新线程2").start();
}
}

这里创建的多个线程共享同一个target,所以它们也共享target对象中的实例变量i

>> 使用Callable和Future

Callable接口可以看作 Runnable 接口的增强版,它提供了一个call()方法作为线程执行体。该方法相比 run 的强大之处在于:① call() 方法可以有返回值;② call() 方法可以声明抛出异常。

但是存在两个问题:

  • Callable 接口并不是 Runnable 接口的子接口,所以 Callable 对象不能直接作为 Thread 的 target
  • call()方法作为线程执行体被执行,如何获取call方法的返回值

为了解决 返回值的获取问题,Java提供了Future接口,用于将 Callable 对象再进行一层包装。Future接口提供了三种功能:

  1. 判断任务是否完成;
  2. 取消任务;
  3. 获取任务执行结果,即call方法的返回值。

FutureTask类是Future接口的典型实现类,同时它还实现了Runnable接口 —— 这就完美的解决了上面的两个问题。

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

class MyTarget implements Callable<Integer> { // 泛型,类型实参Integer表示返回值类型
private int i;
@Override
public Integer call() throws Exception {
for( ;i<100;++i) {
System.out.println(Thread.currentThread().getName()+" "+i);
}
return new Integer(i);
}

public static void main(String[] args) {
MyTarget target = new MyTarget();
FutureTask<Integer> task = new FutureTask<>(target); // 包装
new Thread(task, "新线程").start(); // 以FutureTask对象为target启动线程
try {
System.out.println("新线程中call返回值为"+task.get());
} catch(Exception e) {
e.printStackTrace();
}
}
}

从上面的代码可以看出几点:

  • Callable接口和FutureTask类都有泛型限制,且类型参数表示返回值类型
  • 重写 call() 方法时,可以带返回值、抛出异常;
  • 一般比较耗时的作业可以交给Future对象在后台完成,当主线程将来需要时,再去获取结果。

3、线程的生命周期

如上图所示,线程的生命周期有五种状态:新建(New)、就绪(Runnable)、运行(Running)、阻塞(Blocked)和死亡(Dead)。

  • 当程序new了一个线程对象后,该线程就处于新建状态;
  • 当线程对象调用了start()方法之后,该线程将处于就绪状态;
  • 当处于就绪状态的线程获得了CPU,开始执行线程执行体,该线程处于运行状态;
  • 当处于运行状态的线程调用了sleep/进行阻塞IO/等待同步锁…等时,该线程进入阻塞状态;
  • 当处于阻塞状态的线程sleep结束/IO返回/获得同步锁…等时,该线程将重新进入就绪状态;
  • 当处于运行状态的线程 执行完任务/抛出异常/调用了stop方法时,线程结束并处于死亡状态。

注意:线程只能从阻塞状态进入就绪状态,无法直接进入运行状态。

4、线程控制

>> join线程

Thread 提供了一个让一个线程等待另一个线程完成的方法 —— join()方法。当在线程A中调用了线程B的 join() 方法,线程A将被阻塞,直到线程B执行完为止:

class ThreadA extends Thread {
private ThreadB b;
public ThreadA(ThreadB b) {
super("Thread A");
this.b = b;
}
@Override
public void run(){
try {
b.join();
} catch(Exception e) {
e.printStackTrace();
}
for(int i=0; i<10; ++i) {
System.out.println(getName()+": "+i);
}
}
}

class ThreadB extends Thread {
public ThreadB() {
super("Thread B");
}
@Override
public void run() {
try {
for(int i=0; i<10; ++i) {
System.out.println(getName()+": "+i);
Thread.sleep(1000); // 每次sleep一秒
}
} catch(Exception e) {
e.printStackTrace();
}
}
}

public class JoinDemo {
public static void main(String[] args) {
ThreadB b = new ThreadB();
ThreadA a = new ThreadA(b);
a.start();
b.start();
}
}



>> 后台线程(守护线程)

后台线程(Daemon Thread):又叫“守护线程”或“精灵线程”,它在后台运行,为其他线程提供服务。比如 JVM 的垃圾回收线程。

后台线程有个特征:如果所有的前台线程都死亡,后台线程会自动死亡(JVM退出)。调用Thread对象的setDaemon(true)方法可以将线程设置成后台线程:

public class DaemonThread extends Thread {
@Override
public void run() {
for(int i=0; i<1000; ++i) {
System.out.println(getName()+" "+i);
}
}

public static void main(String[] args) throws Exception {
DaemonThread t = new DaemonThread();
t.setDaemon(true); // 在启动之前设置为后台线程
t.start();
for(int i=0; i<10; ++i) {
System.out.println(Thread.currentThread().getName()+" "+i);
Thread.sleep(1);
}
//-----前台线程(main线程)结束,后台线程随之结束,将无法运行到999-----
}
}



>> 线程休眠:sleep

通过调用Thread.sleep()静态方法,可以让正在运行的线程休眠,进入阻塞状态。


>> 线程让步:yield

通过调用Thread.yield()静态方法,可以让正在运行的线程停止,进入就绪状态。

完全有可能的情况是:当某个线程调用yield()方法停止之后,系统的线程调度器又立马将其调度出来重新执行。(这样的话该方法的功能效果就不明显了)


>> 改变线程优先级

每个线程执行时都具有一定的优先级,优先级高的会优先被线程调度器调度。一个线程默认的优先级与创建它的父线程相同,在默认情况下,main线程具有普通优先级(5)。

Thread 类提供了setPriority(int priority)getPriority()方法来设置和返回指定线程的优先级,其中,优先级是一个整数(范围0~10),Thread类还提供了三个静态常量:

  • MAX_PRIORITY:其值是10
  • MIN_PRIORITY:其值是1
  • NORM_PRIORITY:其值是5

5、线程同步

>> 什么是线程安全

个人认为最好的解释: 如果你的代码在多线程下执行和在单线程下执行永远都能获得一样的结果,那么你的代码就是线程安全的 。

线程安全也是有几个级别的:

  • 不可变:不可变的对象一定是线程安全的,并且永远也不需要额外的同步。比如String、Integer等final类型的类。

  • 绝对线程安全:不管运行时环境如何,调用者都不需要额外的同步措施。要做到这一点通常需要付出许多额外的代价,Java中标注自己是线程安全的类,实际上绝大多数都不是绝对线程安全的,不过绝对线程安全的类,Java中也有,比如说CopyOnWriteArrayList、CopyOnWriteArraySet .

  • 相对线程安全:相对线程安全也就是我们通常意义上所说的线程安全,像Vector这种,add、remove方法都是原子操作,不会被打断,但也仅限于此,如果有个线程在遍历某个Vector、有个线程同时在add这个Vector,99%的情况下都会出现ConcurrentModificationException,也就是 fail-fast机制 。

  • 线程非安全:这个就没什么好说的了,ArrayList、LinkedList、HashMap等都是线程非安全的类


>> 同步代码块

首先看一段示例代码:

class ATM {
private double balance;
public ATM(double balance) {
this.balance = balance;
}
public void draw(double drawMoney) {
try {
if(balance >= drawMoney) {
System.out.println(Thread.currentThread().getName()+"取出钞票: "+drawMoney);
Thread.sleep(1);
balance -= drawMoney;
System.out.println("ATM余额:"+balance);
} else {
System.out.println(Thread.currentThread().getName()+"取钱失败!余额不足!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

public class DrawThread extends Thread {
private ATM atm;
private double drawMoney;
public DrawThread(String name, ATM atm, double drawMoney) {
super(name);
this.atm = atm;
this.drawMoney = drawMoney;
}
@Override
public void run() {
atm.draw(drawMoney); // 取钱
}

public static void main(String[] args) {
ATM atm = new ATM(1000);
new DrawThread("甲", atm, 800).start();
new DrawThread("乙", atm, 800).start();
}
}

// ------运行结果-------
// 甲取出钞票: 800.0
// 乙取出钞票: 800.0
// ATM余额:200.0
// ATM余额:-600.0

由于两个并发线程同时对一个ATM进行修改,而系统恰好在sleep(1)处进行线程切换,这就导致出现了错误。为了解决这个问题,Java多线程引入了同步监视器,使用同步监视器的通用方法就是同步代码块 —— 使用synchronized关键字:

synchronized(obj)
{
// 此处的代码就是同步代码块:同一时刻最多只有一个线程执行这段代码。
}

其中 obj 对象就是同步监视器,线程开始执行同步代码块之前,必须先获得对同步监视器的锁定,执行完后再释放对该同步监视器的锁定。

@Override
public void run() {
synchronized(atm) {
atm.draw(drawMoney);
}
}

对 run() 中的代码块进行同步以后,运行结果就正确了

甲取出钞票: 800.0
ATM余额:200.0
乙取钱失败!余额不足!



>> 同步方法

同步方法就是使用synchronized关键字修饰的方法。对于synchronized修饰的非静态方法,同步监视器是this,也就是调用该方法的对象。

通过使用同步方法,可以非常方便地实现线程安全的类,只需要对该可变类中可能修改竞争资源的方法进行同步即可。

class ATM {
private double balance;
public ATM(double balance) {
this.balance = balance;
}
public synchronized void draw(double drawMoney) {
try {
if(balance >= drawMoney) {
System.out.println(Thread.currentThread().getName()+"取出钞票: "+drawMoney);
Thread.sleep(1);
balance -= drawMoney;
System.out.println("ATM余额:"+balance);
} else {
System.out.println(Thread.currentThread().getName()+"取钱失败!余额不足!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

draw()方法被synchronized修饰以后,ATM类就变成我们通常意义上的线程安全类了。


>> 同步锁(Lock)

除了synchronized,Java 5开始提供了一种功能更强大的线程同步机制 —— 同步锁Lock

另外,这里稍微提一下,Java 5提供了两种锁的根接口:

  • 同步锁Lock:提供了对共享资源的独占访问,每次只能有一个线程对Lock对象加锁。ReentrantLock是Lock接口的实现类。

  • 读写锁ReadWriteLock:虽然写和写、读和写是互斥的,但读和读不是互斥的。如果多个线程同时读数据 却加了同步锁,就降低了程序的性能。ReentrantReadWriteLock是 ReadWriteLock 接口的实现类,它实现了读写的分离, 读锁是共享的,写锁是独占的。

当然,这里只讨论同步锁,下面是一个使用示例:

import java.util.concurrent.locks.ReentrantLock;

class ATM {
private final ReentrantLock lock = new ReentrantLock(); // 同步锁
private double balance;
public ATM(double balance) {
this.balance = balance;
}
public void draw(double drawMoney) {
lock.lock(); // 加锁
try {
if(balance >= drawMoney) {
System.out.println(Thread.currentThread().getName()+"取出钞票: "+drawMoney);
Thread.sleep(1);
balance -= drawMoney;
System.out.println("ATM余额:"+balance);
} else {
System.out.println(Thread.currentThread().getName()+"取钱失败!余额不足!");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock(); // 释放锁
}
}
}



>> 死锁

当两个线程相互等待对方释放持有的锁时,就会发生死锁

那么如何编写一个死锁的Java程序呢?

  1. 两个线程里面分别持有两个Object对象:lock1和lock2;
  2. 线程A在 run() 方法中先获取lock1的对象锁,Thread.sleep(50),然后接着获取lock2的对象锁。
  3. 线程B在 run() 方法中先获取lock2的对象锁,接着获取lock1的对象锁。

线程A一旦sleep结束,线程B已经获取了lock2的对象锁。线程A尝试获取lock2的对象锁,便被阻塞;线程B尝试获取lock1的对象锁,也被阻塞,这样就形成了死锁。

6、线程通信

>> wait()、notify()与notifyAll()

Object类提供了以下三个方法,但这三个方法必须由同步监视器对象来调用:

  • wait():导致当前线程等待,直到其他线程调用该同步监视器对象的 notify() 或 notifyAll() 方法来唤醒该线程。
  • notify():唤醒在此(同步监视器)对象上等待的单个线程。如果有多个线程在等待这个对象锁,则任意选择唤醒其中一个线程。当前线程会等待剩余代码执行完毕才释放同步监视器的锁定。
  • notifyAll():唤醒在此(同步监视器)对象上等待的所有线程。当前线程会等待剩余代码执行完毕才释放同步监视器的锁定。被唤醒的那些线程会争夺对象锁,重新获得对象锁的线程才能从wait方法中返回,继续执行代码。

由于这三个方法必须由同步监视器对象来调用,所以适用于synchronized修饰的同步代码块同步方法。所以,我们才可以结合synchronized和这三个方法来实现线程之间的数据传递。

public class NotifyTest {
private String monitor = "This is a monitor object"; // 同步监控器对象
private double a, b;

class NotifyThread extends Thread {
public NotifyThread(String name) {
super(name);
}
@Override
public void run() {
try {
sleep(100); // 休眠100毫秒
} catch (Exception e) {
e.printStackTrace();
}
synchronized(monitor) { // 获取monitor对象锁
System.out.println(getName()+": "+a+" + "+b+" = "+(a+b));
monitor.notify(); // 唤醒在monitor对象上wait的线程
} // 同步代码块执行完毕,才释放锁
}
}

class WaitThread extends Thread {
public WaitThread(String name, double ax, double bx) {
super(name);
a = ax;
b = bx;
}
@Override
public void run() {
synchronized(monitor) { // 获取monitor对象锁
try {
System.out.println(getName()+": What's the result of "+a+" + "+b+"?");
monitor.wait(); // 释放monitor对象锁,并阻塞线程
System.out.println(getName()+": You're right.");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args) {
NotifyTest test = new NotifyTest();
NotifyThread notify = test.new NotifyThread("notify");
WaitThread wait = test.new WaitThread("waiter", 15, 23);
notify.start();
wait.start();
}
}

如上代码,当Notify线程sleep的时候,Wait线程获取到了monitor对象的控制权(锁),当调用monitor.wait()时,线程会释放monitor对象的锁并进入阻塞状态。接着Notify线程获取到了对象锁,当调用monitor.notify()时,唤醒一个在monitor对象上等待的线程,并在执行完剩余的同步代码块后,释放对象锁。被唤醒的Wait线程尝试重新获取对象锁,获取到以后线程将继续执行。

输出结果:

waiter: What's the result of 15.0 + 23.0?
notify: 15.0 + 23.0 = 38.0
waiter: You're right.



>> Condition类

当程序不使用 synchronized 而使用Lock对象来线程同步时,就不能使用 wait()、notify()、notifyAll()方法通信了。Java提供了一个Condition类来保持协调,通过 Lock 对象的newCondition()方法可以获取与该 Lock 对象相关联的 Condition 对象。

同样的,Condition类提供了三个方法:

  • await():类似于同步监视器的wait()方法。导致当前线程等待,直到其他现场调用该Condition对象的signal()signalAll()方法。
  • signal():类似于同步监控器的notify方法。唤醒在此 Lock 对象上等待的单个线程。
  • signalAll():类似于同步监视器的notifyAll()方法,唤醒在此 Lock 对象上等待的所有线程。
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;

public class NotifyTest {
private final ReentrantLock lock = new ReentrantLock();
private final Condition cond = lock.newCondition();
private double a, b;

class NotifyThread extends Thread {
public NotifyThread(String name) {
super(name);
}
@Override
public void run() {
try {
sleep(100);
} catch (Exception e) {
e.printStackTrace();
}

lock.lock(); // 获得锁
System.out.println(getName()+": "+a+" + "+b+" = "+(a+b));
cond.signal(); // 唤醒在lock对象上等待的线程
lock.unlock(); // 释放锁
}
}

class WaitThread extends Thread {
public WaitThread(String name, double ax, double bx) {
super(name);
a = ax;
b = bx;
}
@Override
public void run() {
lock.lock(); // 获得锁
try {
System.out.println(getName()+": What's the result of "+a+" + "+b+"?");
cond.await(); // 释放锁,并阻塞线程
System.out.println(getName()+": You're right.");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock(); // 释放锁
}
}
}

public static void main(String[] args) {
NotifyTest test = new NotifyTest();
NotifyThread notify = test.new NotifyThread("notify");
WaitThread wait = test.new WaitThread("waiter", 15, 23);
notify.start();
wait.start();
}
}

输出结果与前面一样。只是这里显式地使用 Lock 对象来同步,则需要使用 Condition 对象来暂停、唤醒指定线程。


>> BlockingQueue(阻塞队列)

阻塞队列BlockingQueueQueue的子接口,但它的主要用途并不是作为容器,而是作为线程通信的工具。 BlockingQueue具有一个特征:当生产者线程试图向 BlockingQueue 中放入元素(put方法)时,如果该队列已满,则该线程被阻塞;当消费者线程试图从 BlockingQueue 中取出元素(take方法)时,如果该队列已空,则该线程被阻塞。

BlockingQueue包含如下5个实现类:

  • ArrayBlockingQueue:基于数组实现的 BlockingQueue 队列。
  • LinkedBlockingQueue:基于链表实现的 BlockingQueue 队列。
  • PriorityBlockingQueue:按优先权有序的 BlockingQueue 队列,默认按元素(实现Comparable接口)本身的大小自然排序。
  • SynchronousQueue:同步队列,对该队列的存、取操作必须交替进行。
  • DelayQueue:要求队列元素都实现 Delay 接口,底层基于 PriorityBlockingQueue 实现,不过是根据元素的 getDelay() 方法的返回值进行排序。

以 ArrayBlockingQueue 为例:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;

public class BlockingQueueTest {
private BlockingQueue<String> bq;
public BlockingQueueTest(int capacity) {
bq = new ArrayBlockingQueue<>(capacity);
}

class Producer extends Thread {
public Producer(String name) {
super(name);
}
@Override
public void run() {
String[] strs = new String[]{"Java","Python","Shell"};
for(int i=0; i<2; ++i) {
try {
sleep(200);
bq.put(strs[i%3]+getName());
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(getName()+"生产完成:"+bq);
}
}
}

class Consumer extends Thread {
public Consumer(String name) {
super(name);
}
@Override
public void run() {
while(true) {
try {
sleep(400);
//bq.take();
System.out.println(getName()+"消费完成:"+bq.take());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args) {
BlockingQueueTest test = new BlockingQueueTest(1);
test.new Consumer("Consumer").start();
test.new Producer("Producer-1").start();
test.new Producer("Producer-2").start();
test.new Producer("Producer-3").start();
}
}

这里创建了一个容量为1的 ArrayBlockingQueue,三个生产者线程,一个消费者线程。只要其中一个生产者线程向队列中放入元素,其他生产者线程就必须等待,等待消费者线程取出元素。输出结果如下:

Producer-1生产完成:[JavaProducer-1]
Consumer消费完成:JavaProducer-1
Producer-2生产完成:[JavaProducer-2]
Consumer消费完成:JavaProducer-2
Producer-3生产完成:[JavaProducer-3]
Consumer消费完成:JavaProducer-3
Producer-1生产完成:[PythonProducer-1]
Consumer消费完成:PythonProducer-1
Producer-2生产完成:[PythonProducer-2]
Consumer消费完成:PythonProducer-2
Producer-3生产完成:[PythonProducer-3]
Consumer消费完成:PythonProducer-3

7、线程中抛出异常

在Java多线程程序中,除了通过Callable方式创建线程时可以 throws Exception 之外,通过继承Thread类或实现Runnable接口的方式创建线程都是不能 throws Exception 的。“线程是独立执行的代码片断,线程的问题应该由线程自己来解决,而不要委托到外部。”基于这样的设计理念,在Java线程中的异常(无论是Checked异常还是Runtime异常),都应该在线程代码边界之内(run方法内)进行 try-catch 并处理掉。

Checked异常我们是都可以进行捕获的,但线程依然有可能抛出 Runtime 异常。当此类异常抛出时,如果我们想在线程代码边界之外(run方法之外)来捕获和处理这个异常的话,java为我们提供了一种线程内发生异常时能够在线程代码边界之外处理异常的回调机制,即Thread对象提供的两个方法来设置异常处理器:

  • static setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh):为该线程类的所有线程实例设置默认的异常处理器。
  • setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh):为指定的线程实例设置异常处理器。

当一个线程抛出未处理异常时,JVM在结束该线程之前会自动查找是否有对应的Thread.UncaughtExceptionHandler对象,如果有,则会调用该对象的uncaughtException(Thread t, Throwable e)方法来处理该异常。

class MyExHandler implements Thread.UncaughtExceptionHandler {
public void uncaughtException(Thread t, Throwable e) {
System.out.println(t.getName() + "线程出现了异常:"+e);
}
}

public class ExHandlerThread extends Thread {
@Override
public void run() {
this.setUncaughtExceptionHandler(new MyExHandler());
int a = 5 / 0; // 这里将抛出未处理异常
System.out.println("程序正常结束");
}

public static void main(String[] args) {
new ExHandlerThread().start();
}
}

运行结果:

Thread-0线程出现了异常:java.lang.ArithmeticException: / by zero

可以看出,虽然异常处理器(MyExHandler对象)对线程抛出的 RuntimeException 进行来了处理,但该线程依然没有正常结束。这说明异常处理器机制与 try-catch 机制是不同的 —— 当catch捕获异常时,异常不会向上传播给上一级调用者;但使用异常处理器对异常进行处理之后,异常依然会传播给上一级调用者。

8、线程池

系统创建一个新线程是需要成本的,它涉及与操作系统交互。线程池(Thread Pool)就是为了降低这种成本,提高性能。—— 线程池在系统启动时即创建大量空闲的线程,当程序将一个 Runnable 对象或 Callable 对象传给线程池,线程池就会启动一个线程来执行它,当执行结束后该线程并不会死亡,而是再次返回线程池变成空闲状态。

Java 提供了一个Executors工厂类来产生线程池,该工厂类包含几个静态工厂方法:

  1. newCachedThreadPool():创建一个具有缓存功能的线程池,如果线程池大小超过了处理需要,就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又会自动添加新线程来处理任务。

  2. newFixedThreadPool(int nThreads):创建一个可重用的、具有固定线程数的线程池。

  3. newSingleThreadExecutor():创建一个只有单线程的线程池,相当于调用newFixedThreadPool(1)。

  4. newScheduledThreadPool(int corePoolSize):创建具有指定线程数的线程池,它可以在指定延迟后或周期性地执行任务。

  5. newSingleThreadScheduledExecutor():创建只有一个线程的线程池,它可以在指定延迟后或周期性地执行任务。

  6. newWorkStealingPool(int parallelism):创建持有足够线程的线程池来支持给定的并行级别,该线程池拥有多个任务队列。如果不给定参数,则默认的并行级别为CPU的个数。

前三个工厂方法返回一个ExecutorService对象,该对象代表一个线程池。第四第五个方法返回ScheduledExecutorService对象,它是 ExecutorService 的子接口,可以在指定延迟后或周期性地执行任务。最后一个方法是 Java 8新增的,它可以充分利用多CPU并行的能力。

import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

public class ThreadPoolTest {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(6); // 线程池

Runnable task = () -> {
for(int i=0; i<10; ++i) {
System.out.println(Thread.currentThread().getName()+"的i值为:"+i);
}
};

pool.submit(task); // 向线程池提交两个任务
pool.submit(task);
pool.shutdown(); // 关闭线程池
}
}

ExecutorService 和 ScheduledExecutorService 提供了以下几个重要的方法:

execute()   // 执行一个任务,返回值为void
submit() // 提交一个任务,返回值为Future对象
schedule() // 指定一个任务延迟多久后执行,这是ScheduledExecutorService才有的方法
shutdown() // 关闭线程池



ForkJoinPool

ForkJoinPool是一种特殊的线程池,是为了更好地支持在多核CPU上的并行。它可以将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。ForkJoinPool的构造器如下:

ForkJoinPool(int parallel)  //创建一个包含 parallel 个并行线程的线程池,默认为CPU的个数

创建了实例以后,就可以调用 ForkJoinPool 对象的submit(ForkJoinTask task)来执行指定任务。其中,ForkJoinTask是一个抽象类,代表一个可并行、合并的任务,ForkJoinTask还有两个抽象子类:

  • RecursiveAction:代表没有返回值的任务;
  • RecursiveTask:代表有返回值的任务。

下面给一个示例:

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

class ParallelTask extends RecursiveAction {
private static final int THRESHOLD = 50;
private int start,end;
public ParallelTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
public void compute() {
if(end-start < THRESHOLD) {
for(int i=start;i<end;++i) {
System.out.println(Thread.currentThread().getName()+"的i值:"+i);
}
} else { // 将大任务分解成小任务
int middle = (start+end)/2;
ParallelTask left = new ParallelTask(start, middle);
ParallelTask right = new ParallelTask(middle, end);
left.fork(); // 并行执行两个小任务
right.fork();
}
}
}

public class ForkJoinPoolTest {
public static void main(String[] args) throws Exception {
ForkJoinPool pool = new ForkJoinPool();
pool.submit(new ParallelTask(0, 300)); // 提交可分解的 ParallelTask 任务
pool.awaitTermination(2, TimeUnit.SECONDS);
pool.shutdown();
}
}

可以看出,打印0~300数字这个“大任务”被拆分成了多个“小任务”并行地执行。每个“小任务”不能打印超过50个数字。

9、线程相关类

>> ThreadLocal类

ThreadLocal类是为了解决在多线程中对同一变量的访问冲突。ThreadLocal通过给每一个使用它的线程提供一个变量的副本,这样每个线程都可以独立地改变自己的副本,不会与其他线程冲突。

ThreadLocal并不能替代同步机制,它们的功能不同:同步机制是为了同步多个线程对共享资源的并发访问,是线程之间通信的有效方式;而 ThreadLocal 是为了隔离多个线程的数据,从根本上避免多个线程对资源的竞争。

import java.util.HashSet;

public class ThreadLocalTest {
private static ThreadLocal<HashSet<String>> name = new ThreadLocal<>();

class MyThread extends Thread {
@Override
public void run() {
HashSet<String> s = new HashSet<>();
for(int i=0; i<3; ++i) {
s.add(getName()+"-"+getName().hashCode()+i);
}
name.set(s);
System.out.println(getName()+":\n"+name.get());
}
}

public static void main(String[] args) {
ThreadLocalTest test = new ThreadLocalTest();
test.new MyThread().start();
test.new MyThread().start();
test.new MyThread().start();
}
}

可以看出,ThreadLocal使得各线程能够保持各自独立的一个对象,是通过每个线程中的 new 对象 的操作来创建的对象,每个线程创建一个并ThreadLocal.set()添加,不是什么对象的拷贝。实现的思路是:在ThreadLocal类中有一个Map,用于存储每一个线程的变量副本,Map中元素的 key 为线程对象,而 value 对应线程的变量副本。


>> 包装非线程安全的集合

在《Java基础笔记(二)》中介绍的ArrayListLinkedListHashSetTreeSetHashMapTreeMap等等集合都是线程不安全的。当在多线程环境中访问这些集合时,可以使用Collections提供的静态方法将这些集合包装成线程安全的集合:

  • synchronizedCollection()
  • synchronizedList()
  • synchronizedMap()
  • synchronizedSet()
  • synchronizedSortedMap()
  • synchronizedSortedSet()

>> 线程安全的集合类

java.util.concurrent包下提供了很多支持高效并发访问的集合接口和实现类,这些线程安全的集合类大致可以分为两类:

  1. Concurrent开头的集合类,比如ConcurrentHashMapConcurrentSkipListMapConcurrentSkipListSetConcurrentLinkedQueueConcurrentLinkedDeque

  2. CopyOnWrite开头的集合类,比如CopyOnWriteArrayListCopyOnWriteArraySet

其中以Concurrent开头的集合类支持高效并发访问,它们都是线程安全的,且保证永远不会锁住整个集合。例如ConcurrentHashMap默认支持16个线程并发写入,无须等待。

而以CopyOnWrite开头的集合类,正如名字所暗示的,它们采用复制底层数组的方式实现写操作 —— 例如CopyOnWriteArrayList当执行读操作时,直接读取集合本身,无须加锁与阻塞;当执行写操作时,在底层复制一份新的数组,对新的数组执行写操作,修改完成以后将新数组的引用赋回去。(适用于读操作远远多于写操作的场景)


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK