Java高并发编程之Synchronizer
# Java高并发编程之Synchronizer
Synchronizer是一个对象,它根据本身的状态调节线程的控制流。阻塞队列可以扮演一个Synchronizer的角色;其他类型的 Synchronizer包括信号量( semaphore)、以及闭锁( latch).在平台类库中存在一些 Synchronizer类;如果这些不能满你同样可以创建一个你自己的 Synchronizer。
# CountDownLatch
CountDownLatch是JDK提供的一个同步工具,它可以让一个或多个线程等待,一直等到其他线程中执行完成一组操作。
# 常用的方法
countDown()和await():CountDownLatch在初始化时,需要指定用给定一个整数作为计数器。当调用countDown方法时,计数器会被减1;当调用await方法时,如果计数器大于0时,线程会被阻塞,一直到计数器被countDown方法减到0时,线程才会继续执行。计数器是无法重置的,当计数器被减到0时,调用await方法都会直接返回。
# 示例
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 实现原理
CountDownLatch有一个内部类叫做Sync,它继承了AbstractQueuedSynchronizer类,其中维护了一个整数state,并且保证了修改state的可见性和原子性。创建CountDownLatch对象的实例时,同时也会创建一个Sync的实例,同时把计数器的值传给Sync实例:
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
2
3
4
在countDown()中,只调用了Sync实例的releaseShared()。releaseShared(1)先对计数器进行减1操作,如果减1后的计数器为0,唤醒被await方法阻塞的所有线程。
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 使用CAS尝试对计数器进行自减操作
doReleaseShared(); // 如果计数器为0,唤醒被await方法阻塞的所有线程
return true;
}
return false;
}
2
3
4
5
6
7
8
9
10
11
在await(),只调用了Sync实例的acquireSharedInterruptibly方法,其中acquireSharedInterruptibly()会判断计数器是否为0,如果不为0则阻塞当前线程。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/* tryAcquireShared方法,是AbstractQueuedSynchronizer中的一个模板方法,其具体实现在Sync类中,其主要
* 是判断计数器是否为零,如果为零则返回1,如果不为零则返回-1。
*/
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
2
3
4
5
6
7
8
9
10
11
12
13
# CyclicBarrier
CyclicBarrier也叫同步屏障、循环栅栏,可以让一组线程到达屏障时被阻塞,直到最后一个线程到达后,他们一起被执行.
参考CountDownLatch, CyclicBarrier允许重复执行,即为计数器可重置。
# CyclicBarrier和CountDownLatch的区别
- CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
- CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。
# Semaphore
计数信号量用来控制能够同时访问特定资源的活动的数量,常用于实现简单的生产者消费者模型。
# 示例
生产者线程put元素到队列,若队列满则组赛到队列有空间;消费者不断从队列take获取元素,若队列空则组赛道队列有元素。
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... }
}
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... handle ...}
}
void consume(Object x) { ... }
}
class Setup {
void main() {
BlockingQueue q = new SomeQueueImplementation();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33