<>ForkJoin
什么是ForkJoin
ForkJion 在JDK1.7,并行执行任务!提高效率。大数据量!
大数据:Map Reduce(将大任务拆分为小任务)
ForkJion 特点:工作窃取
这个里面维护的都是双端队列,A 线程没有执行完,B线程执行完了,B 线程则去 执行A 中的(红框)未执行完任务
ForkJion
代码案例:
package com.forkjoin; import java.util.concurrent.ExecutionException; import
java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream; /** * 3000 6000(ForkJoin) 9000(Stream并行流) */
public class Test { public static void main(String[] args) throws
ExecutionException, InterruptedException { //test1();//8039 //test2(); //4369
test3(); //152 } public static void test1(){ Long sum =0L; long start = System.
currentTimeMillis(); for (Long i = 1L; i < 10_0000_0000; i++) { sum+=i; } long
end= System.currentTimeMillis(); System.out.println("sum="+sum+"时间:"+(end -start
)); } public static void test2() throws ExecutionException, InterruptedException
{ Long sum =0L; long start = System.currentTimeMillis(); //forkJoin 池子
ForkJoinPool forkJoinPool = new ForkJoinPool(); //实例化业务方法 ForkJoinTask<Long>
task= new ForkJoinDemo(0L,10_0000_0000L); //在池子中执行业务 ForkJoinTask<Long> submit =
forkJoinPool.submit(task); sum = submit.get(); long end = System.
currentTimeMillis(); System.out.println("sum="+sum+"时间:"+(end -start)); } public
static void test3() throws ExecutionException, InterruptedException { Long sum =
0L; long start = System.currentTimeMillis(); //Stream 并行流 range
不包含后面的,rangeClosed 包含后面的 sum = LongStream.rangeClosed(0L, 10_0000_0000L).
parallel().reduce(0, Long::sum); long end = System.currentTimeMillis(); System.
out.println("sum="+sum+"时间:"+(end -start)); } } package com.forkjoin; import
java.util.concurrent.RecursiveTask; /** * * 求和计算的任务! * * 3000, 6000(ForkJoin)
9000(Stream 并行流) * 如何使用forkJoin * 1.forkJoinPool 通过它来执行 *
2.计算任务forkjoinPoolexecute * 3.计算类要继承ForkJoinTask * * Recursive : 发音 rekusive *
*/ public class ForkJoinDemo extends RecursiveTask<Long> { private Long start;
private Long end; //临界值 private Long temp = 10000L; public ForkJoinDemo(Long
start, Long end) { this.start = start; this.end = end; } public static void main
(String[] args) { // int sum =0; // for (int i = 0; i < 10_0000_0000; i++) { //
sum+=i; // } // System.out.println(sum); // // } public void test(){ //100 -50
>50 if((end -start) >temp){ int sum =0; for (int i = 0; i < 10_0000_0000; i++) {
sum+=i; } System.out.println(sum); } } //计算的返回值就是 RecursiveTask 的泛型 @Override
protected Long compute() { //如果范围比 1 万还小我们就不用分支合并 if((end -start) <temp){ Long
sum=0L; for (long i =start; i <=end; i++) { sum+=i; } return sum; }else{
//forkjoin: 一个任务拆分为多个任务 ,像递归 long middle = (start + end) / 2; //中间值 ForkJoinDemo
task1= new ForkJoinDemo(start, middle); task1.fork(); //拆分任务,把任务压入到线程队列
ForkJoinDemo task2 = new ForkJoinDemo(middle+1, end); task2.fork(); return task1
.join() + task2 .join(); } } }