CountDownLatch、Semaphone、CyclicBarrier入门

Rothschil 2020-05-27 11:02:00
CountDownLatch,Semaphone,CyclicBarrier

1. CountDownLatch

允许绑定一个或多个线程等待,直到在其他执行中的线程操作完成为止的一种计数器。这种计数器使用的是一种递减
直到当前计数达到零为止,await 方法将阻塞,此后等所有线程将被释放,并且随后的所有await调用将立即返回。

这是一种一次性现象-无法重置计数。如果需要用于重置计数的版本,请考虑使用CyclicBarrier

CountDownLatch 是一种多功能的同步工具,可以用于多种目的。以1的计数初始化的 CountDownLatch 用作简单的on / off锁存器或gate:所有调用线程等待在gate处等待,直到被countDown的线程打开为止。初始化为N的CountDownLatch可以用于使一个线程等待,直到N个线程完成某个动作或某个动作已经完成N次。

CountDownLatch的一个有用属性是,它不需要调用countDown的线程在继续进行操作之前就无需等待计数达到零,它只是防止任何线程经过等待状态,直到所有线程都可以通过。

用法示例:这是一对类,其中一组工作线程使用两个倒计时锁存器

1.1. 构造函数

1
2
3
4
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

CountDownLatch 构造函数只有一个,参数 count 为绑定大小,核心实现时通过一个同步控件(Sync),维持 AQS 状态表示计数。

Sync的继承结构

AbstractQueuedSynchronizer队列式同步器

AbstractQueuedSynchronizer接口实现

通过图我们可以得知,常用 ReentranLockSemaphoreCountDownLatch等,后面我们会用专门一个篇幅来详细研究下 AbstractQueuedSynchronizer

1.2. 样例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

public class UserCountDownLatch {

public static void main(String[] args) {
int size = 3;
CountDownLatch countDownLatch = new CountDownLatch(size);
ExecutorService executorService = Executors.newFixedThreadPool(size);
String suffix = "t_";
for (int i = 0; i < size; i++) {
executorService.execute(new CountDownLatchDemo(countDownLatch,suffix+i));
}
try {
countDownLatch.await();
System.out.println("执行完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.shutdown();
}
}

class CountDownLatchDemo implements Runnable{

private String exeStr;
private CountDownLatch countDownLatch;

public CountDownLatchDemo() {
}

public CountDownLatchDemo(CountDownLatch countDownLatch,String exeStr) {
this.countDownLatch = countDownLatch;
this.exeStr = exeStr;
}

@Override
public void run() {
try {
int id = new Random().nextInt(5);
TimeUnit.SECONDS.sleep(id);
LocalDateTime localDateTime = LocalDateTime.now();
String resutl = exeStr+"_"+id;
System.out.println("当前时间 "+localDateTime.getMinute()+":"+localDateTime.getSecond()+" 当前线程名: "+Thread.currentThread().getName()+ " 结果为: "+ resutl);
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}

2. CyclicBarrier

谷歌翻译出来意思循环屏障,可以理解为一种屏障机制,让一组线程任务到达该屏障时被阻塞,直到最后一个线程到达该屏障,被阻塞的线程才能继续执行。形象点的比喻,类似田径比赛时的信号枪,当所有运动员都准备好,信号枪发出信号!

2.1. 构造函数

JDK源码中有两个构造函数,分别为:

其中 parties 绑定给定数量的线程或者任务在屏障被释放之前必须调用等待的线程数;参数barrierAction当屏障被绊倒时执行的命令;如果没有动作,则返回null

2.2. 样例

定义一个CyclicBarrier 绑定三个线程,在创建一个线程池,添加三个线程进去,最后别忘了再来一个shutdown,否则线程会一直挂起。例子比较简单,有兴趣同学可以自己看看!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

public class UserCyclicBarrier {

public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
ExecutorService executorService = Executors.newFixedThreadPool(3);
CyclicBarrierDemo cdb1 = new CyclicBarrierDemo(cyclicBarrier,"t1");

CyclicBarrierDemo cdb2 = new CyclicBarrierDemo(cyclicBarrier,"t2");

CyclicBarrierDemo cdb3 = new CyclicBarrierDemo(cyclicBarrier,"t3");

executorService.execute(cdb1);
executorService.execute(cdb2);
executorService.execute(cdb3);
System.out.println("执行完成");

executorService.shutdown();
}


}

class CyclicBarrierDemo implements Runnable{

private String exeStr;
private CyclicBarrier cyclicBarrier;

public CyclicBarrierDemo() {
}

public CyclicBarrierDemo(CyclicBarrier cyclicBarrier,String exeStr) {
this.cyclicBarrier = cyclicBarrier;
this.exeStr = exeStr;
}

@Override
public void run() {
try {
int id = new Random().nextInt(5);
TimeUnit.SECONDS.sleep(id);
LocalDateTime localDateTime = LocalDateTime.now();
String resutl = exeStr+"_"+id;
System.out.println("当前时间 "+localDateTime.getMinute()+":"+localDateTime.getSecond()+" 当前线程名: "+Thread.currentThread().getName()+ " 结果为: "+ resutl);
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch(BrokenBarrierException e){
e.printStackTrace();
}

}
}

CyclicBarrier执行效果

3. Semaphone

Semaphore 用于限制可以访问某些资源(物理或逻辑的)的线程数目,他维护了一个许可证集合,有多少资源需要限制就维护多少许可证集合,假如这里有N个资源,那就对应于N个许可证,同一时刻也只能有N个线程访问。一个线程获取许可证就调用 acquire 方法,用完了释放资源就调用 release 方法。

编写一个样例,只维护一个许可证,限制被访问,从另一方面保护了资源的原子性!

3.1. 构造函数

JDK源码中提供两个构造函数,默认为非公平:

1
2
Semaphore(int permits)
Semaphore(int permits, boolean fair)

3.2. 样例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ExploreSemaphone {

/**
* 定义一个 Semaphone,维护一个许可证
*/
static Semaphore sp = new Semaphore(1);

public static void main(String[] args) {

final ExploreSemaphone exploreSemaphone = new ExploreSemaphone();

ExecutorService es = Executors.newFixedThreadPool(4);

es.execute(new Student(sp,"李克强"));
es.execute(new Student(sp,"习近平"));
es.execute(new Student(sp,"毛泽东"));
}
}


class Student implements Runnable{

private Semaphore sp = null;
private String stuName;

public Student(Semaphore sp,String stuName){
this.sp=sp;
this.stuName=stuName;
}

public void run() {
try {
sp.acquire();
System.out.println("ThreadName is "+ Thread.currentThread().getName()+" 学生名:"+stuName +" 获取许可");
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("ThreadName is "+ Thread.currentThread().getName()+" 学生名:"+stuName +" 许可使用完毕,准备释放");
sp.release();
}
}
}

小结

4. Future

submit、execut区别

5. Queue

5.1. CoucurrentLinkedQueue

同步队列,无界

5.2. 阻塞队列BlockingQueue

5.2.1. ArrayBlockingQueue

有界

5.2.2. LinkedBlockingQueue

无界

5.2.3. SynchronousQueue

容器中不支持直接加元素,需要take线程与add线程阻塞集合

5.2.4. 优先级阻塞队列PriorityBlockingQueue

每次调用take方法获取元素的时候将优先级最高的返回出来!

5.2.5. 延迟阻塞队列DelayQueue