8

11个知识点轻松掌握Java线程同步与实现

 3 years ago
source link: https://my.oschina.net/u/4678580/blog/4870569
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

为何要使用Java线程同步? Java允许多线程并发控制,当多个线程同时操作一个可共享的资源变量时,将会导致数据不准确,相互之间产生冲突,因此加入同步锁以避免在该线程没有完成操作之前,被其他线程的调用,从而保证了该变量的唯一性和准确性。小编这里总结了一份多线程并发编程的思维导图,关注公众号:麒麟改bug。

但其并发编程的根本,就是使线程间进行正确的通信。其中两个比较重要的关键点,如下:

线程通信:重点关注线程同步的几种方式; 正确通信:重点关注是否有线程安全问题; Java中提供了很多线程同步操作,比如:synchronized关键字、wait/notifyAll、ReentrantLock、Condition、一些并发包下的工具类、Semaphore,ThreadLocal、AbstractQueuedSynchronizer等。本文主要说明一下这几种同步方式的使用及优劣。

1 ReentrantLock可重入锁

自JDK5开始,新增了Lock接口以及它的一个实现类ReentrantLock。ReentrantLock可重入锁是J.U.C包内置的一个锁对象,可以用来实现同步,基本使用方法如下:

`public class ReentrantLockTest {

private ReentrantLock lock = new ReentrantLock();

public void execute() {
    lock.lock();
    try {
        System.out.println(Thread.currentThread().getName() + " do something synchronize");
        try {
            Thread.sleep(5000l);
        } catch (InterruptedException e) {
            System.err.println(Thread.currentThread().getName() + " interrupted");
            Thread.currentThread().interrupt();
        }
    } finally {
        lock.unlock();
    }
}

public static void main(String[] args) {
    ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
    Thread thread1 = new Thread(new Runnable() {
        [@Override](https://my.oschina.net/u/1162528)
        public void run() {
            reentrantLockTest.execute();
        }
    });
    Thread thread2 = new Thread(new Runnable() {
        [@Override](https://my.oschina.net/u/1162528)
        public void run() {
            reentrantLockTest.execute();
        }
    });
    thread1.start();
    thread2.start();
}

}` 上面例子表示 同一时间段只能有1个线程执行execute方法,输出如下:

Thread-0 do something synchronize // 隔了5秒钟 输入下面 Thread-1 do something synchronize 可重入锁中可重入表示的意义在于 对于同一个线程,可以继续调用加锁的方法,而不会被挂起。可重入锁内部维护一个计数器,对于同一个线程调用lock方法,计数器+1,调用unlock方法,计数器-1。

举个例子再次说明一下可重入的意思:在一个加锁方法execute中调用另外一个加锁方法anotherLock并不会被挂起,可以直接调用(调用execute方法时计数器+1,然后内部又调用了anotherLock方法,计数器+1,变成了2):

`public void execute() {
    lock.lock();
    try {
        System.out.println(Thread.currentThread().getName() + " do something synchronize");
        try {
            anotherLock();
            Thread.sleep(5000l);
        } catch (InterruptedException e) {
            System.err.println(Thread.currentThread().getName() + " interrupted");
            Thread.currentThread().interrupt();
        }
    } finally {
        lock.unlock();
    }
}

public void anotherLock() {
    lock.lock();
    try {
        System.out.println(Thread.currentThread().getName() + " invoke anotherLock");
    } finally {
        lock.unlock();
    }
}`
输出:

`Thread-0 do something synchronize
Thread-0 invoke anotherLock
// 隔了5秒钟 输入下面
Thread-1 do something synchronize
Thread-1 invoke anotherLock`

2 synchronized

synchronized跟ReentrantLock一样,也支持可重入锁。但是它是 一个关键字,是一种语法级别的同步方式,称为内置锁:

`public class SynchronizedKeyWordTest {

public synchronized void execute() {
        System.out.println(Thread.currentThread().getName() + " do something synchronize");
    try {
        anotherLock();
        Thread.sleep(5000l);
    } catch (InterruptedException e) {
        System.err.println(Thread.currentThread().getName() + " interrupted");
        Thread.currentThread().interrupt();
    }
}

public synchronized void anotherLock() {
    System.out.println(Thread.currentThread().getName() + " invoke anotherLock");
}

public static void main(String[] args) {
    SynchronizedKeyWordTest reentrantLockTest = new SynchronizedKeyWordTest();
    Thread thread1 = new Thread(new Runnable() {
        [@Override](https://my.oschina.net/u/1162528)
        public void run() {
            reentrantLockTest.execute();
        }
    });
    Thread thread2 = new Thread(new Runnable() {
        [@Override](https://my.oschina.net/u/1162528)
        public void run() {
            reentrantLockTest.execute();
        }
    });
    thread1.start();
    thread2.start();
}

}` 输出结果跟ReentrantLock一样,这个例子说明内置锁可以作用在方法上。synchronized关键字也可以修饰静态方法,此时如果调用该静态方法,将会锁住整个类。

同步是一种高开销的操作,因此应该尽量减少同步的内容。通常没有必要同步整个方法,使用synchronized代码块同步关键代码即可。

synchronized跟ReentrantLock相比,有几点局限性:

加锁的时候不能设置超时。ReentrantLock有提供tryLock方法,可以设置超时时间,如果超过了这个时间并且没有获取到锁,就会放弃,而synchronized却没有这种功能; ReentrantLock可以使用多个Condition,而synchronized却只能有1个 不能中断一个试图获得锁的线程; ReentrantLock可以选择公平锁和非公平锁; ReentrantLock可以获得正在等待线程的个数,计数器等; 所以,Lock的操作与synchronized相比,灵活性更高,而且Lock提供多种方式获取锁,有Lock、ReadWriteLock接口,以及实现这两个接口的ReentrantLock类、ReentrantReadWriteLock类。

关于Lock对象和synchronized关键字选择的考量:

最好两个都不用,使用一种java.util.concurrent包提供的机制,能够帮助用户处理所有与锁相关的代码。 如果synchronized关键字能满足用户的需求,就用synchronized,因为它能简化代码。 如果需要更高级的功能,就用ReentrantLock类,此时要注意及时释放锁,否则会出现死锁,通常在finally代码释放锁。 在性能考量上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时Lock的性能要远远优于synchronized。所以说,在具体使用时要根据适当情况选择。

3 Condition条件对象

Condition条件对象的意义在于 对于一个已经获取Lock锁的线程,如果还需要等待其他条件才能继续执行的情况下,才会使用Condition条件对象。

Condition可以替代传统的线程间通信,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll()。

为什么方法名不直接叫wait()/notify()/nofityAll()?因为Object的这几个方法是final的,不可重写!

public class ConditionTest {

public static void main(String[] args) {
    ReentrantLock lock = new ReentrantLock();
    Condition condition = lock.newCondition();
    Thread thread1 = new Thread(new Runnable() {
        [@Override](https://my.oschina.net/u/1162528)
        public void run() {
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + " run");
                System.out.println(Thread.currentThread().getName() + " wait for condition");
                try {
                    condition.await();
                    System.out.println(Thread.currentThread().getName() + " continue");
                } catch (InterruptedException e) {
                    System.err.println(Thread.currentThread().getName() + " interrupted");
                    Thread.currentThread().interrupt();
                }
            } finally {
                lock.unlock();
            }
        }
    });
    Thread thread2 = new Thread(new Runnable() {
        @Override
        public void run() {
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + " run");
                System.out.println(Thread.currentThread().getName() + " sleep 5 secs");
                try {
                    Thread.sleep(5000l);
                } catch (InterruptedException e) {
                    System.err.println(Thread.currentThread().getName() + " interrupted");
                    Thread.currentThread().interrupt();
                }
                condition.signalAll();
            } finally {
                lock.unlock();
            }
        }
    });
    thread1.start();
    thread2.start();
}

} 这个例子中thread1执行到condition.await()时,当前线程会被挂起,直到thread2调用了condition.signalAll()方法之后,thread1才会重新被激活执行。

这里需要注意的是thread1调用Condition的await方法之后,thread1线程释放锁,然后马上加入到Condition的等待队列,由于thread1释放了锁,thread2获得锁并执行,thread2执行signalAll方法之后,Condition中的等待队列thread1被取出并加入到AQS中,接下来thread2执行完毕之后释放锁,由于thread1已经在AQS的等待队列中,所以thread1被唤醒,继续执行。

传统线程的通信方式,Condition都可以实现。Condition的强大之处在于它可以为多个线程间建立不同的Condition。

注意,Condition是被绑定到Lock上的,要创建一个Lock的Condition必须用newCondition()方法。

4 wait¬ify/notifyAll方式

Java线程的状态转换图与相关方法,如下:

在图中,红框标识的部分方法,可以认为已过时,不再使用。上图中的方法能够参与到线程同步中的方法,如下:

1.wait、notify、notifyAll方法:

wait/notifyAll方式跟ReentrantLock/Condition方式的原理是一样的。 Java中每个对象都拥有一个内置锁,在内置锁中调用wait,notify方法相当于调用锁的Condition条件对象的await和signalAll方法。

public class WaitNotifyAllTest {

public synchronized void doWait() {
    System.out.println(Thread.currentThread().getName() + " run");
    System.out.println(Thread.currentThread().getName() + " wait for condition");
    try {
        this.wait();
        System.out.println(Thread.currentThread().getName() + " continue");
    } catch (InterruptedException e) {
        System.err.println(Thread.currentThread().getName() + " interrupted");
        Thread.currentThread().interrupt();
    }
}

public synchronized void doNotify() {
    try {
        System.out.println(Thread.currentThread().getName() + " run");
        System.out.println(Thread.currentThread().getName() + " sleep 5 secs");
        Thread.sleep(5000l);
        this.notifyAll();
    } catch (InterruptedException e) {
        System.err.println(Thread.currentThread().getName() + " interrupted");
        Thread.currentThread().interrupt();
    }
}

public static void main(String[] args) {
    WaitNotifyAllTest waitNotifyAllTest = new WaitNotifyAllTest();
    Thread thread1 = new Thread(new Runnable() {
        @Override
        public void run() {
            waitNotifyAllTest.doWait();
        }
    });
    Thread thread2 = new Thread(new Runnable() {
        @Override
        public void run() {
            waitNotifyAllTest.doNotify();
        }
    });
    thread1.start();
    thread2.start();
}

} 这里需要注意的是 调用wait/notifyAll方法的时候一定要获得当前线程的锁,否则会发生IllegalMonitorStateException异常。

2.线程中通信可以使用的方法。

线程中调用了wait方法,则进入阻塞状态,只有等另一个线程调用与wait同一个对象的notify方法。这里有个特殊的地方,调用wait或者notify,前提是需要获取锁,也就是说,需要在同步块中做以上操作。

3.join方法:

该方法主要作用是在该线程中的run方法结束后,才往下执行。

`package com.thread.simple;
 
public class ThreadJoin {
    public static void main(String[] args) {
        Thread thread= new Thread(new Runnable() {
              @Override
              public void run() {
                   System.err.println("线程"+Thread.currentThread().getId()+" 打印信息");
              }
        });
        thread.start();`

    try {
        thread.join();
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    System.err.println("主线程打印信息");   
}

4.yield方法:

线程本身的调度方法,使用时线程可以在run方法执行完毕时,调用该方法,告知线程已可以出让CPU资源。

public class Test1 {  
    public static void main(String[] args) throws InterruptedException {  
        new MyThread("低级", 1).start();  
        new MyThread("中级", 5).start();  
        new MyThread("高级", 10).start();  
    }  
}  

class MyThread extends Thread {  
    public MyThread(String name, int pro) {  
        super(name);// 设置线程的名称  
        this.setPriority(pro);// 设置优先级  
    }  

    @Override  
    public void run() {  
        for (int i = 0; i < 30; i++) {  
            System.out.println(this.getName() + "线程第" + i + "次执行!");  
            if (i % 5 == 0)  
                Thread.yield();  
        }  
    }  
}  

5.sleep方法: 通过sleep(millis)使线程进入休眠一段时间,该方法在指定的时间内无法被唤醒,同时也不会释放对象锁;

  • 可以明显看到打印的数字在时间上有些许的间隔 */ public class Test1 {
    public static void main(String[] args) throws InterruptedException {
    for(int i=0;i<100;i++){
    System.out.println("main"+i);
    Thread.sleep(100);
    }
    }
    } sleep方法告诉操作系统 至少在指定时间内不需为线程调度器为该线程分配执行时间片,并不释放锁(如果当前已经持有锁)。实际上,调用sleep方法时并不要求持有任何锁。

所以,sleep方法并不需要持有任何形式的锁,也就不需要包裹在synchronized中。

5 ThreadLocal

ThreadLocal是一种把变量放到线程本地的方式来实现线程同步的。比如:SimpleDateFormat不是一个线程安全的类,可以使用ThreadLocal实现同步,如下:

`public class ThreadLocalTest {

private static ThreadLocal<SimpleDateFormat> dateFormatThreadLocal = new ThreadLocal<SimpleDateFormat>() {
    @Override
    protected SimpleDateFormat initialValue() {
        return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    }
};

public static void main(String[] args) {
    Thread thread1 = new Thread(new Runnable() {
        @Override
        public void run() {
            Date date = new Date();
            System.out.println(dateFormatThreadLocal.get().format(date));
        }
    });
    Thread thread2 = new Thread(new Runnable() {
        @Override
        public void run() {
            Date date = new Date();
            System.out.println(dateFormatThreadLocal.get().format(date));
        }
    });
    thread1.start();
    thread2.start();
}

}` ThreadLocal与同步机制的对比选择:

ThreadLocal与同步机制都是 为了解决多线程中相同变量的访问冲突问题。 前者采用以 "空间换时间" 的方法,后者采用以 "时间换空间" 的方式。

6 volatile修饰变量

volatile关键字为域变量的访问提供了一种免锁机制,使用volatile修饰域相当于告诉虚拟机该域可能会被其他线程更新,因此每次使用该域就要重新计算,而不是使用寄存器中的值,volatile不会提供任何原子操作,它也不能用来修饰final类型的变量。

//只给出要修改的代码,其余代码与上同 public class Bank { //需要同步的变量加上volatile private volatile int account = 100; public int getAccount() { return account; } //这里不再需要synchronized public void save(int money) { account += money; } } 多线程中的非同步问题主要出现在对域的读写上,如果让域自身避免这个问题,则就不需要修改操作该域的方法。用final域,有锁保护的域和volatile域可以避免非同步的问题。

7 Semaphore信号量

Semaphore信号量被用于控制特定资源在同一个时间被访问的个数。类似连接池的概念,保证资源可以被合理的使用。可以使用构造器初始化资源个数:

public class SemaphoreTest {

private static Semaphore semaphore = new Semaphore(2);

public static void main(String[] args) {
    for(int i = 0; i < 5; i ++) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + " " + new Date());
                    Thread.sleep(5000l);
                    semaphore.release();
                } catch (InterruptedException e) {
                    System.err.println(Thread.currentThread().getName() + " interrupted");
                }
            }
        }).start();
    }
}

Thread-1 Mon Apr 18 18:03:46 CST 2016 Thread-0 Mon Apr 18 18:03:46 CST 2016 Thread-3 Mon Apr 18 18:03:51 CST 2016 Thread-2 Mon Apr 18 18:03:51 CST 2016 Thread-4 Mon Apr 18 18:03:56 CST 2016

8 并发包下的工具类

8.1 CountDownLatch

CountDownLatch是一个计数器,它的构造方法中需要设置一个数值,用来设定计数的次数。每次调用countDown()方法之后,这个计数器都会减去1,CountDownLatch会一直阻塞着调用await()方法的线程,直到计数器的值变为0。

public class CountDownLatchTest {

public static void main(String[] args) {
    CountDownLatch countDownLatch = new CountDownLatch(5);
    for(int i = 0; i < 5; i ++) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " " + new Date() + " run");
                try {
                    Thread.sleep(5000l);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            }
        }).start();
    }
    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("all thread over");
}

Thread-2 Mon Apr 18 18:18:30 CST 2016 run Thread-3 Mon Apr 18 18:18:30 CST 2016 run Thread-4 Mon Apr 18 18:18:30 CST 2016 run Thread-0 Mon Apr 18 18:18:30 CST 2016 run Thread-1 Mon Apr 18 18:18:30 CST 2016 run all thread over

8.2 CyclicBarrier

CyclicBarrier阻塞调用的线程,直到条件满足时,阻塞的线程同时被打开。

调用await()方法的时候,这个线程就会被阻塞,当调用await()的线程数量到达屏障数的时候,主线程就会取消所有被阻塞线程的状态。

在CyclicBarrier的构造方法中,还可以设置一个barrierAction。在所有的屏障都到达之后,会启动一个线程来运行这里面的代码。

public class CyclicBarrierTest {

public static void main(String[] args) {
    Random random = new Random();
    CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
    for(int i = 0; i < 5; i ++) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                int secs = random.nextInt(5);
                System.out.println(Thread.currentThread().getName() + " " + new Date() + " run, sleep " + secs + " secs");
                try {
                    Thread.sleep(secs * 1000);
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " " + new Date() + " runs over");
            }
        }).start();
    }
}

} 相比CountDownLatch,CyclicBarrier是可以被循环使用的,而且遇到线程中断等情况时,还可以利用reset()方法,重置计数器,从这些方面来说,CyclicBarrier会比CountDownLatch更加灵活一些。

9 使用原子变量实现线程同步

有时需要使用线程同步的根本原因在于 对普通变量的操作不是原子的。那么什么是原子操作呢?

原子操作就是指将读取变量值、修改变量值、保存变量值看成一个整体来操作 即-这几种行为要么同时完成,要么都不完成。

在java.util.concurrent.atomic包中提供了创建原子类型变量的工具类,使用该类可以简化线程同步。比如:其中AtomicInteger以原子方式更新int的值:

class Bank { private AtomicInteger account = new AtomicInteger(100);

public AtomicInteger getAccount() {
    return account;
}

public void save(int money) {
    account.addAndGet(money);
}

10 AbstractQueuedSynchronizer

AQS是很多同步工具类的基础,比如:ReentrantLock里的公平锁和非公平锁,Semaphore里的公平锁和非公平锁,CountDownLatch里的锁等他们的底层都是使用AbstractQueuedSynchronizer完成的。

基于AbstractQueuedSynchronizer自定义实现一个独占锁:

public class MySynchronizer extends AbstractQueuedSynchronizer {

@Override
protected boolean tryAcquire(int arg) {
    if(compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

@Override
protected boolean tryRelease(int arg) {
    setState(0);
    setExclusiveOwnerThread(null);
    return true;
}

public void lock() {
    acquire(1);
}

public void unlock() {
    release(1);
}

public static void main(String[] args) {
    MySynchronizer mySynchronizer = new MySynchronizer();
    Thread thread1 = new Thread(new Runnable() {
        @Override
        public void run() {
            mySynchronizer.lock();
            try {
                System.out.println(Thread.currentThread().getName() + " run");
                System.out.println(Thread.currentThread().getName() + " will sleep 5 secs");
                try {
                    Thread.sleep(5000l);
                    System.out.println(Thread.currentThread().getName() + " continue");
                } catch (InterruptedException e) {
                    System.err.println(Thread.currentThread().getName() + " interrupted");
                    Thread.currentThread().interrupt();
                }
            } finally {
                mySynchronizer.unlock();
            }
        }
    });
    Thread thread2 = new Thread(new Runnable() {
        @Override
        public void run() {
            mySynchronizer.lock();
            try {
                System.out.println(Thread.currentThread().getName() + " run");
            } finally {
                mySynchronizer.unlock();
            }
        }
    });
    thread1.start();
    thread2.start();
}

11 使用阻塞队列实现线程同步

前面几种同步方式都是基于底层实现的线程同步,但是在实际开发当中,应当尽量远离底层结构。本节主要是使用LinkedBlockingQueue来实现线程的同步。

LinkedBlockingQueue是一个基于链表的队列,先进先出的顺序(FIFO),范围任意的blocking queue。

package com.xhj.thread;

import java.util.Random; import java.util.concurrent.LinkedBlockingQueue;

  • 用阻塞队列实现线程同步 LinkedBlockingQueue的使用 / public class BlockingSynchronizedThread { /*
    • 定义一个阻塞队列用来存储生产出来的商品 / private LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(); /*
    • 定义生产商品个数 / private static final int size = 10; /*
    • 定义启动线程的标志,为0时,启动生产商品的线程;为1时,启动消费商品的线程 */
package com.xhj.thread;

import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 用阻塞队列实现线程同步 LinkedBlockingQueue的使用
 */
public class BlockingSynchronizedThread {
    /**
     * 定义一个阻塞队列用来存储生产出来的商品
     */
    private LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
    /**
     * 定义生产商品个数
     */
    private static final int size = 10;
    /**
     * 定义启动线程的标志,为0时,启动生产商品的线程;为1时,启动消费商品的线程
     */
    private int flag = 0;

    private class LinkBlockThread implements Runnable {
        @Override
        public void run() {
            int new_flag = flag++;
            System.out.println("启动线程 " + new_flag);
            if (new_flag == 0) {
                for (int i = 0; i < size; i++) {
                    int b = new Random().nextInt(255);
                    System.out.println("生产商品:" + b + "号");
                    try {
                        queue.put(b);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    System.out.println("仓库中还有商品:" + queue.size() + "个");
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            } else {
                for (int i = 0; i < size / 2; i++) {
                    try {
                        int n = queue.take();
                        System.out.println("消费者买去了" + n + "号商品");
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    System.out.println("仓库中还有商品:" + queue.size() + "个");
                    try {
                        Thread.sleep(100);
                    } catch (Exception e) {
                        // TODO: handle exception
                    }
                }
            }
        }
    }

    public static void main(String[] args) {
        BlockingSynchronizedThread bst = new BlockingSynchronizedThread();
        LinkBlockThread lbt = bst.new LinkBlockThread();
        Thread thread1 = new Thread(lbt);
        Thread thread2 = new Thread(lbt);
        thread1.start();
        thread2.start();
    }
}

文章到这里就结束了

喜欢小编分享的技术文章可以点赞关注哦!小编这边整理了一些Java核心知识技术280多页的资料集锦,2020最新总结的面试集锦资料关注公众号:麒麟改bug。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK