Java高并发编程之Synchronizer

# Java高并发编程之Synchronizer

Synchronizer是一个对象,它根据本身的状态调节线程的控制流。阻塞队列可以扮演一个Synchronizer的角色;其他类型的 Synchronizer包括信号量( semaphore)、以及闭锁( latch).在平台类库中存在一些 Synchronizer类;如果这些不能满你同样可以创建一个你自己的 Synchronizer。

# CountDownLatch

CountDownLatch是JDK提供的一个同步工具,它可以让一个或多个线程等待,一直等到其他线程中执行完成一组操作。

# 常用的方法

countDown()await():CountDownLatch在初始化时,需要指定用给定一个整数作为计数器。当调用countDown方法时,计数器会被减1;当调用await方法时,如果计数器大于0时,线程会被阻塞,一直到计数器被countDown方法减到0时,线程才会继续执行。计数器是无法重置的,当计数器被减到0时,调用await方法都会直接返回。

# 示例

class Driver { // ...
    void main() throws InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(N);

        for (int i = 0; i < N; ++i) // create and start threads
            new Thread(new Worker(startSignal, doneSignal)).start();

        doSomethingElse();            // don't let run yet
        startSignal.countDown();      // let all threads proceed
        doSomethingElse();
        doneSignal.await();           // wait for all to finish
    }
}

class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;
    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
    }
    public void run() {
        try {
            startSignal.await();
            doWork();
            doneSignal.countDown();
        } catch (InterruptedException ex) {} // return;
    }

    void doWork() { ... }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 实现原理

CountDownLatch有一个内部类叫做Sync,它继承了AbstractQueuedSynchronizer类,其中维护了一个整数state,并且保证了修改state的可见性和原子性。创建CountDownLatch对象的实例时,同时也会创建一个Sync的实例,同时把计数器的值传给Sync实例:

public CountDownLatch(int count) {
   if (count < 0) throw new IllegalArgumentException("count < 0");
   this.sync = new Sync(count);
}
1
2
3
4

countDown()中,只调用了Sync实例的releaseShared()releaseShared(1)先对计数器进行减1操作,如果减1后的计数器为0,唤醒被await方法阻塞的所有线程。

public void countDown() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {  // 使用CAS尝试对计数器进行自减操作
        doReleaseShared();        // 如果计数器为0,唤醒被await方法阻塞的所有线程
        return true;
    }
    return false;
}
1
2
3
4
5
6
7
8
9
10
11

await(),只调用了Sync实例的acquireSharedInterruptibly方法,其中acquireSharedInterruptibly()会判断计数器是否为0,如果不为0则阻塞当前线程。

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

/* tryAcquireShared方法,是AbstractQueuedSynchronizer中的一个模板方法,其具体实现在Sync类中,其主要
 * 是判断计数器是否为零,如果为零则返回1,如果不为零则返回-1。
 */
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# CyclicBarrier

CyclicBarrier也叫同步屏障、循环栅栏,可以让一组线程到达屏障时被阻塞,直到最后一个线程到达后,他们一起被执行.

参考CountDownLatch, CyclicBarrier允许重复执行,即为计数器可重置。

# CyclicBarrier和CountDownLatch的区别

  • CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
  • CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。

# Semaphore

计数信号量用来控制能够同时访问特定资源的活动的数量,常用于实现简单的生产者消费者模型。

# 示例

生产者线程put元素到队列,若队列满则组赛到队列有空间;消费者不断从队列take获取元素,若队列空则组赛道队列有元素。

class Producer implements Runnable {
    private final BlockingQueue queue;
    Producer(BlockingQueue q) { queue = q; }
    public void run() {
        try {
            while (true) { queue.put(produce()); }
        } catch (InterruptedException ex) { ... handle ...}
    }
    Object produce() { ... }
}

class Consumer implements Runnable {
    private final BlockingQueue queue;
    Consumer(BlockingQueue q) { queue = q; }
    public void run() {
        try {
            while (true) { consume(queue.take()); }
        } catch (InterruptedException ex) { ... handle ...}
    }
    void consume(Object x) { ... }
}

class Setup {
    void main() {
        BlockingQueue q = new SomeQueueImplementation();
        Producer p = new Producer(q);
        Consumer c1 = new Consumer(q);
        Consumer c2 = new Consumer(q);
        new Thread(p).start();
        new Thread(c1).start();
        new Thread(c2).start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
上次更新: 2020/07/29, 14:07:00
最近更新
01
RabbitMQ简介
10-27
02
聊聊Java多态
10-21
03
JVM垃圾回收器
10-16
更多文章>