序言

随着并行编程的逐渐流行,Java 自 1.7 版本引入了 Fork/Join 框架 来支持并行处理。到了 Java 8,在此基础上进一步加强了功能,提供了更为便捷的并行流(parallelStream),方便开发者进行并行计算。

是什么

Java 8 提供了更加便捷的方式来处理集合数据,Stream 类就是其中之一。parallelStream() 方法能够充分利用多核 CPU 的计算能力,通过多线程并行处理集合数据,从而加快数据处理的速度。

parallelStream 的核心是利用处理器的多个核心,将任务拆分为多个子任务,并分配到不同的线程上 并行执行,最终将子任务的结果合并为整体结果。与普通的顺序流不同,并行流 并不会保证操作的执行顺序,这意味着处理过程是无序的。

为什么

并行流的引入是为了提高程序的性能,但是选择并行流并不总是最好的选择。在某些情况下,我们需要以特定的顺序执行代码,在这些情况下,我们最好使用顺序流以牺牲性能为代价来执行任务。这两种流之间的性能差异仅在大型程序或复杂项目中才值得关注。对于小规模的项目,它甚至可能不明显。基本上,当顺序流表现不佳时,您应该考虑使用并行流。

如何选择Stream和parallelStream

在从stream和parallelStream方法中进行选择时,我们可以考虑以下几个问题:

  1. 是否需要并行?
  2. 任务之间是否是独立的?是否会引起任何竞态条件?
  3. 结果是否取决于任务的调用顺序?

对于问题1,在回答这个问题之前,需要明确要解决的问题是什么,数据量有多大,计算的特点是什么?并不是所有的问题都适合使用并发程序来求解,比如当数据量不大时,顺序执行往往比并行执行更快。毕竟,准备线程池和其它相关资源也是需要时间的。但是,当任务涉及到I/O操作并且任务之间不互相依赖时,那么并行化就是一个不错的选择。通常而言,将这类程序并行化之后,执行速度会提升好几个等级。

对于问题2,如果任务之间是独立的,并且代码中不涉及到对同一个对象的某个状态或者某个变量的更新操作,那么就表明代码是可以被并行化的。

对于问题3,由于在并行环境中任务的执行顺序是不确定的,因此对于依赖于顺序的任务而言,并行化也许不能给出正确的结果。

适用场景

建议在以下情况下使用 并行流

  1. 执行顺序无关紧要:操作的执行顺序不会影响最终结果。
  2. 元素之间状态独立:一个元素的操作不会影响其他元素的状态。
  3. 数据源稳定:数据源不会发生改变,确保线程安全。

通过合理使用 parallelStream,可以充分发挥多核处理器的性能优势,提高大规模数据处理的效率。

如何正确使用

并行流并不总是比顺序流快。所以正确的姿势使用并行流是尤为重要的,不然适得其反。

决定某个特定情况下是否有必要使用并行流。可以参考一下几点建议

1. 如果有疑问,提前进行测量和检查。并行流有时候会和直觉不一致,所以在考虑选择顺序流还是并行流时,很重要的建议就是用适当的基准来检查其性能。

2. 留意装箱。自动装箱和拆箱操作会大大降低性能。Java 8中有原始类型流(IntStream、LongStream和DoubleStream)来避免这种操作,尽量使用这些流进行操作。

3. 有些操作本身在并行流上的性能就比顺序流差。特别是limit和findFirst等依赖于元素顺序的操作,它们在并行流上执行的代价非常大。例如,findAny会比findFirst性能好,因为它不一定要按顺序来执行。你总是可以调用unordered方法来把有序流变成无序流。那么,如果你需要流中的N个元素而不是专门要前N个的话,对无序并行流调用limit可能会比单个有序流(比如数据源是一个List)更高效。

4. 考虑流的操作流水线的总计算成本。设N是要处理的元素的总数,Q是一个元素通过流水线的大致处理成本,则N*Q就是这个对成本的一个粗略的定性估计。Q值较高就意味着使用并行流时性能好的可能性比较大。

5. 对于较小的数据量,选择并行流几乎从来都不是一个好的决定。并行处理少数几个元素的好处还抵不上并行化造成的额外开销。

6. 考虑流背后的数据结构是否易于分解。例如,ArrayList的拆分效率比LinkedList高得多,因为前者用不着遍历就可以平均拆分,后者则必须遍历。另外,用range工厂方法创建的原始类型流也可以快速分解。可以参考一下表格:

image

7. 流自身的特点以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。例如,一个SIZED流可以分成大小相等的两部分,这样每个部分都可以比较高效地并行处理,但筛选操作可能丢弃的元素个数无法预测,从而导致流本身的大小未知。

8. 还要考虑终端操作中合并步骤的代价是大是小(例如Collector中的combiner方法)。如果这一步代价很大,那么组合每个子流产生的部分结果所付出的代价就可能会超出通过并行流得到的性能提升。

注意事项

  1. 因为是并行流,所以所涉及到的数据结构需要使用线程安全的。例如
    1
    2
    3
    4
    5
    6
    listByPage.parallelStream().forEach(str-> {
    //使用线程安全的数据结构
    //ConcurrentHashMap
    //CopyOnWriteArrayList
    //等等进行操作
    });
  2. 线程关联的ThreadLocal将会失效。

由于开头提到的主线程有可能参与到parallelStream中的任务处理的过程中。因此如果我们处理的任务方法中包含对ThreadLocal的处理,可能除主线程之外的所有线程都获取不到自己的线程局部变量,加之ForkJoinPool中的线程是反复使用的,线程关联的ThreadLocal会发生共用的情况。

所以我的建议是,parallelStream中就不要使用ThreadLocal了,要么在任务处理方法中,第一行先进行ThreadLocal.set(),之后再由ThreadLocal.get()获取到自己的线程局部变量

  1. 使用并行流时,不要使用collectors.groupingBy、collectors.toMap

使用并行流时,不要使用collectors.groupingBy、collectors.toMap,替代为collectors.groupingByConcurrent、collectors.toConcurrentMap,或直接使用串行流。

原因,并行流执行时,通过操作Key来合并多个map的操作比较昂贵。详细可以查看官网介绍。

https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html#concurrent_reduction

  1. 使用parallelStream也不一定会提升性能

在CPU资源紧张的时候,使用并行流可能会带来频繁的线程上下文切换,导致并行流执行的效率还没有串行执行的效率高。

源码

1
2
3
4
5
6
7
8
9
/**
* @return a possibly parallel {@code Stream} over the elements in this
* collection
* @since 1.8
*/

default Stream<E> parallelStream() {
return StreamSupport.stream(spliterator(), true);
}

从上面代码中注释的@return a possibly parallel可以看得出来,parallelStream()并不是一定返回一个并行流,有可能parallelStream()全是由主线程顺序执行的。因此使用parallelStream时要特别注意。

原理分析

Java 中使用 Stream 流进行多线程处理非常方便,尤其是通过 parallelStream() 方法。例如:

1
2
3
list.parallelStream().forEach(s -> {
// 后续业务处理
});

如何实现多线程处理?

parallelStream 的底层实现依赖于 Fork/Join 框架,其核心是通过 ForkJoinPool 线程池来调度任务。执行并行操作时,会有 主线程ForkJoinPool 中的 worker 线程 共同参与并行处理。

Fork/Join 框架的基础

parallelStream 是基于 ForkJoinPool 实现的,而 ForkJoinPool 是 Java 提供的专门用于 并行任务执行 的框架。ForkJoinPool 本质上是一个线程池,它实现了 ExecutorService 接口,因此与传统线程池有着密切的关系。

ForkJoinPool 的特点

  • 任务拆分与合并:通过 分治策略 将大任务拆分成小任务(Fork),然后并行执行,最终将结果合并(Join)。
  • 线程调度:ForkJoinPool 内部采用 工作窃取算法,确保线程高效利用。
  • 线程组成:ForkJoinPool 包含主线程和若干个 worker 线程,这些线程共同执行并行任务。

ForkJoinPool 与 ExecutorService 的关系

ForkJoinPool 实现了 ExecutorService 接口,因此是 Executor 框架的一部分。它为并行流的任务调度提供了基础支持,区别于普通线程池的在于:

  • ForkJoinPool 专为 分治任务 设计。
  • 内部使用 分治策略工作窃取算法 来提高并行执行效率。

通过 ForkJoinPool,parallelStream 能够将任务拆分成多个子任务,并分发给多个线程执行,从而实现高效的多线程处理。

ForkJoinPool和ExecutorService的继承关系如图所示——源自该篇文章Java8 并行流(parallelStream)原理分析及注意事项

image

简单来说,parallelStream 之所以能够实现多线程并行处理,是因为它借助了 Fork/Join 框架,并使用 ForkJoinPool 线程池进行任务的拆分和调度。在执行过程中,主线程与 ForkJoinPool 中的 worker 线程会共同参与任务的执行,提升计算效率。

分治策略

Fork/Join 框架 的核心思想是 分而治之,用于高效地处理大型任务。具体过程如下:

对于一个较大的任务,首先将其拆分(Fork)成多个更小的子任务,例如 task1 和 task2。

  • task1 分配给线程 thread1 进行处理。
  • task2 分配给线程 thread2 进行处理。

在处理过程中:

  • 如果 thread1 发现 task1 仍然过大,便会进一步将其拆分为更小的子任务,比如 task1.1task1.2
  • thread2 如果判断 task2 的任务量足够小,则会直接执行该任务并得到结果 result2

当拆分的子任务执行完毕后,开始进行合并(Join):

  • task1.1task1.2 的结果会先合并成 result1
  • 最终,result1result2 再合并,得到整个任务的最终结果。

总结:
Fork/Join 框架通过将大任务不断拆分为小任务,分发给多个线程并行执行,最后合并各个子任务的结果,从而高效地完成大规模任务的处理。这种机制充分利用了多核处理器的计算能力,提高了任务执行效率。

下图源自该篇文章Java8 并行流(parallelStream)原理分析及注意事项

image

工作窃取算法

ForkJoinPool 的核心之一是 工作窃取算法(Work-Stealing),用于高效地利用线程资源,最大化 CPU 使用率。工作窃取算法通过为每个线程分配独立的任务队列,并允许线程在完成自身任务后从其他线程的队列中“窃取”任务,达到负载均衡。具体过程如下:

原理:
  1. 任务分配:

    • 当一个大任务进入 ForkJoinPool 时,框架将其拆分成多个小任务。
    • 小任务均匀分配到多个线程对应的队列中。
  2. 任务执行:

    • 每个线程从自己的任务队列头部取任务执行。
    • 如果任务过大,线程会进一步拆分(Fork)为更小的子任务,将这些子任务放入自己的任务队列。
  3. 任务窃取:

    • 如果某个线程完成了自己的任务,且队列已空,则它会尝试从其他线程的队列末端窃取任务执行。
    • 这种窃取行为避免了线程闲置,提升了整体性能。
示例分析:

假设有两个线程,thread1thread2,它们分别处理各自的任务队列:

`thread1` 完成了自己的任务后发现队列为空。
此时,`thread1` 从 `thread2` 的任务队列末端窃取未完成的任务,继续执行。

如果 thread2 的任务队列也空了,则说明所有任务都已完成。

优点:
  1. 高效利用资源:线程通过窃取任务避免了资源闲置,充分利用 CPU。
  2. 减少锁竞争:每个线程有独立的队列,窃取任务时的竞争开销较低。
  3. 动态负载均衡:通过窃取机制实现了线程间的负载均衡,适应任务的不均匀分布。
总结:

工作窃取算法通过独立队列与窃取机制,实现线程间的高效协作。这种设计特别适合任务量大、任务拆分粒度小且独立的场景,是现代并行计算框架中常用的优化手段。

实例演示

提交有返回值的任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.IntStream;

/**
* @Description 提交有返回值的任务
*/

public class ForkJoinRecursiveTask {

/**
* 最大计算数
*/
private static final int MAX_THRESHOLD = 100;

public static void main(String[] args) {
//创建ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();
//异步提交RecursiveTask任务
ForkJoinTask<Integer> forkJoinTask = pool.submit(new CalculatedRecursiveTask(0, 1000));
try {
//根据返回类型获取返回值
Integer result = forkJoinTask.get();
System.out.println("执行结果为:" + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
pool.shutdown();
}
}

private static class CalculatedRecursiveTask extends RecursiveTask<Integer> {
private final int start;
private final int end;

public CalculatedRecursiveTask(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
//判断计算范围,如果小于等于5,那么一个线程计算即可,否则进行分割
if ((end - start) <= MAX_THRESHOLD) {
//返回[start,end]的总和
return IntStream.rangeClosed(start, end).sum();
} else {
//任务分割
int middle = (end + start) / 2;
CalculatedRecursiveTask task1 = new CalculatedRecursiveTask(start, middle);
CalculatedRecursiveTask task2 = new CalculatedRecursiveTask(middle + 1, end);
//执行
task1.fork();
task2.fork();
//等待返回结果
return task1.join() + task2.join();
}
}
}
}

执行结果如下——源自该篇文章Java8 并行流(parallelStream)原理分析及注意事项

image

提交无返回值的任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

/**
* @Description 提交无返回值的任务
*/

public class ForkJoinRecursiveAction {

/**
* 最大计算数
*/
private static final int MAX_THRESHOLD = 100;
private static final AtomicInteger SUM = new AtomicInteger(0);

public static void main(String[] args) throws InterruptedException {
//创建ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();
//异步提交RecursiveAction任务
pool.submit(new CalculatedRecursiveTask(0, 1000));
//等待3秒后输出结果,因为计算需要时间
pool.awaitTermination(1, TimeUnit.SECONDS);
System.out.println("结果为:" + SUM);
pool.shutdown();
}

private static class CalculatedRecursiveTask extends RecursiveAction {
private final int start;
private final int end;

public CalculatedRecursiveTask(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected void compute() {
//判断计算范围,如果小于等于5,那么一个线程计算即可,否则进行分割
if ((end - start) <= MAX_THRESHOLD) {
//因为没有返回值,所有这里如果要获取结果,需要存入公共的变量中
SUM.addAndGet(IntStream.rangeClosed(start, end).sum());
} else {
//任务分割
int middle = (end + start) / 2;
CalculatedRecursiveTask task1 = new CalculatedRecursiveTask(start, middle);
CalculatedRecursiveTask task2 = new CalculatedRecursiveTask(middle + 1, end);
//执行
task1.fork();
task2.fork();
}
}
}
}

执行结果如下——源自该篇文章Java8 并行流(parallelStream)原理分析及注意事项

image

虽然ForkJoin实际的代码非常复杂,但是通过这个例子应该了解到ForkJoinPool底层的分治算法和工作窃取原理。ForkJoin不仅在Java8之后的Stream中广泛使用。golang等其他语言的协程机制,也是采用类似的原理来实现的。

参考 & 鸣谢

  1. Java8 并行流(parallelStream)原理分析及注意事项-CSDN博客
  2. 深入浅出parallelStream_parralstream-CSDN博客