Appearance
Java 基础:流操作
1. 迭代与流的差异
普通迭代:
Java
int count = 0;
for (String w : words) {
if (w.length() > 12) count++;
}1
2
3
4
2
3
4
使用流时:
Java
long count = words.stream()
.filter(w -> w.length() > 12)
.count();1
2
3
2
3
流表面上看起来和集合很类似,都可以让我们转换和获取数据。但是,它们之间存在着显著的差异:
- 流并不存储其元素。这些元素可能存储在底层的集合中,或者是按需生成的;
- 流的操作不会修改其数据源。例如,
filter方法不会从流中移除元素,而是会生成一个新的流,其中不包含被过滤掉的元素; - 流的操作是尽可能惰性执行的。这意味着直至需要其结果时,操作才会执行。例如,如果我们只想查找前 5 个长单词而不是所有长单词,那么
filter方法就会在匹配到第 5 个单词后停止过滤。因此,我们甚至可以操作无限流;
Note
因为中间的流操作是惰性的,所以在终止操作得以执行时,集合有可能已经发生了变化,但它仍旧可以工作(尽管我们不推荐这种做法)。例如:
JavaList<String> wordList = ...; Stream<String> words = wordList.stream(); wordList.add("END"); long n = words.distinct().count();1
2
3
4
2. 流的创建
Collection对象:可以Collection.stream方法将任何集合转换为一个流。数组:
使用静态的
Stream.of方法:JavaStream<String> words = Stream.of(contents.split("\\PL+"));静态的
Stream.empty方法可以创建不包含任何元素的流:JavaStream<String> silence = Stream.empty(); // Generic type <String> is inferred; same as Stream.<String>empty()使用
Arrays.stream(array, from, to)可以用数组中的一部分元素来创建一个流;
Stream接口有两个用于创建无限流的静态方法:generate方法支持从Supplier<T>生成一个流:常量流:
JavaStream<String> echos = Stream.generate(() -> "Echo");随机流:
JavaStream<Double> randoms = Stream.generate(Math::random);iterate接受一个 “种子” 值,以及一个UnaryOperation<T>函数,并反复地将该函数应用到之前的结果上,例如:JavaStream<BigInteger> integers = Stream.iterate(BigInteger.ZERO, n -> n.add(BigInteger.ONE));如果要产生一个有限序列,则需要添加一个谓词来描述迭代应该如何结束:
Javavar limit = new BigInteger("10000000"); Stream<BigInteger> integers = Stream.iterate(BigInteger.ZERO, n -> n.compareTo(limit) < 0, n -> n.add(BigInteger.ONE));1
2
3
4
5
Stream.ofNullable方法会用一个对象来创建一个非常短的流。如果该对象为null,那么这个流的长度就为0;否则,这个流的长度为1,即只包含该对象。
除了以上方式实际上还有大量的方法可以产生流。例如,Pattern 类有一个 splitAsStream 方法,它会按照某个正则表达式来分割一个 CharSequence 对象。可以使用下面的语句来将一个字符串分割为一个个的单词:
Java
Stream<String> words = Pattern.compile("\\PL+").splitAsStream(contents);Scanner.tokens 方法会产生一个扫描器的符号流:
Java
Stream<String> words = new Scanner(contents).tokens();静态的 Files.lines 方法会返回一个包含了文件中所有行的 Stream:
Java
try (Stream<String> lines = Files.lines(path)) {
// Process lines
}1
2
3
2
3
如果持有的 Iterable 对象不是集合,那么可以通过下面的调用将其转换为一个流:
Java
StreamSupport.stream(iterable.spliterator(), false);如果是 Iterator 对象,那么可以使用下面的语句:
Java
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false);3. filter、map 和 flatMap 方法
流的转换会产生一个新的流,它的元素派生自另一个流中的元素。例如,filter 转换会产生一个新流,它的元素与某种条件相匹配:
Java
List<String> words = ...;
Stream<String> longWords = words.stream().filter(w -> w.length() > 12);1
2
2
若想要按照某种方式来转换流中的值,可以使用 map 方法并传递执行该转换的函数:
Java
Stream<String> lowercaseWords = words.stream().map(String::toLowerCase);假设我们有一个函数,它返回的不是一个值,而是一个包含众多值的流。例如,下面示例的方法会将字符串转换为字符串流:
Java
public static Stream<String> codePoints(String s) {
var result = new ArrayList<String>();
int i = 0;
while (i < s.length()) {
int j = s.offsetByCodePoints(i, 1);
result.add(s.substring(i, j));
i = j;
}
return result.stream();
}1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
此时我们将 codePoints 方法映射到一个字符串流上:
Java
Stream<Stream<String>> result = words.stream().map(w -> codePoints(w));那么我们会得到一个包含流的流(Stream<Stream<String>>),而这通常不是我们所需要的,为了将其展平为单个流(Stream<String>),此时便可以使用 flatMap 方法而不是 map 方法:
Java
Stream<String> flatResult = words.stream().flatMap(w -> codePoints(w)); // Calls codePoints on each word and flattens the results4. limit、skip、takeWhile、dropWhile 和 concat 方法
调用 stream.limit(n) 会返回一个新的流,它在 n 个元素之后结束(如果原来的流比 n 短,那么就会在该流结束时结束)。这个方法对于裁剪无限流的尺寸特别有用。例如:
Java
Stream<Double> randoms = Stream.generate(Math::random).limit(100);调用 stream.skip(n) 正好相反,它会丢弃前 n 个元素:
Java
Stream<String> words = Stream.of(contents.split("\\PL+")).skip(1);stream.takeWhile(predicate) 调用会收集所有开头符合条件的元素:
Java
Stream<String> initialDigits = codePoints(str).takeWhile(s -> "0123456789".contains(s));stream.dropWhile(predicate) 方法的做法正好相反,它会丢弃所有开头符合条件的元素:
Java
Stream<String> withoutInitialWhiteSpace = codePoints(str).dropWhile(s -> s.trim().length() == 0);用 stream 类的静态 concat 方法可以将两个流连接起来:
Java
Stream<String> combined = Stream.concat(codePoints("Hello"), codePoints("World")); // Yields the stream ["H", "e", "l", "l", "o", "W", "o", "r", "l", "d"]Note:当然,第一个流不应该是无限的,否则第二个流永远都不会有机会处理。
5. distinct、sorted 和 peek 方法
distinct 方法会返回一个流,它的元素是从原有流中产生的,即原来的元素按照同样的顺序剔除重复元素后产生的。这些重复元素并不一定是毗邻的:
Java
Stream<String> uniqueWords = Stream.of("merrily", "merrily", "merrily", "gently").distinct(); // Only one "merrily" is retained对于流的排序,有多种 sorted 方法的变体可用。其中一种用于操作 Comparable 元素的流,而另一种可以接受一个 Comparator。下面,我们对字符串排序,使得最长的字符串排在最前面:
Java
Stream<String> longestFirst = words.stream().sorted(Comparator.comparing(String::length).reversed());最后,peek 方法会产生另一个流,它的元素与原来流中的元素相同,但是在每次获取一个元素时,都会调用一个函数。这对于调试来说很方便:
Java
Object[] powers = Stream.iterate(1.0, p -> p * 2)
.peek(e -> System.out.println("Fetching " + e))
.limit(20).toArray();1
2
3
2
3
6. 简单约简
约简是一种终结操作(terminal operation),它们会将流约简为可以在程序中使用的非流值。
一些常见的简单约简,比如 count 方法返回流中元素的数量,max 和 min 它们分别返回最大值和最小值。不过需要注意的是,这些方法返回的是一个 Optional<T> 类型的值,它要么在其中包装了答案,要么表示没有任何值。
findFirst 返回非空集合中的第一个值。它通常在与 filter 组合使用:
Java
Optional<String> startsWithQ = words.filter(s -> s.startsWith("Q")).findFirst();如果不强调使用第一个匹配,而是使用任意的匹配都可以,那么就可以使用 findAny 方法。这个方法在并行处理流时很有效,因为流可以报告任何它找到的匹配而不是被限制为必须报告第一个匹配:
Java
Optional<String> startsWithQ = words.parallel().filter(s -> s.startsWith("Q")).findAny();如果只想知道是否存在匹配,那么可以使用 anyMatch:
Java
boolean aWordStartsWithQ = words.parallel().anyMatch(s -> s.startsWith("Q"));还有 allMatch 和 noneMatch 方法,它们分别在所有元素和没有任何元素匹配谓词的情况下返回 true。这些方法也可以通过并行运行而获益。
7. Optional 类型
有效地使用 Optional 的关键是要使用这样的方法:它在值不存在的情况下会产生一个可替代物,而只有在值存在的情况下才会使用这个值。
Tip
以下是一些有关
Optional的正确用法指南:
Optional类型的变量永远都不应该为null;- 不要使用
Optional类型的字段(field),因为其代价是额外多出一个对象。在类的内部,使用null表示缺失的字段更易于操作;- 不要在集合中放置
Optional对象,并且不要将它们用作map的键;
7.1. orElse、orElseGet 和 orElseThrow 方法
当值不存在时返回一个默认值:
Java
String result = optionalString.orElse(""); // The wrapped string, or "" if none当值不存在时通过 Supplier<T> 函数获取一个替代值:
Java
String result = optionalString.orElseGet(() -> System.getProperty("myapp.default")); // The function is only called when needed当值不存在时抛出一个异常:
Java
String result = optionalString.orElseThrow(IllegalStateException::new); // Supply a method that yields an exception object7.2. ifPresent 和 ifPresentOrElse 方法
ifPresent 方法会接受一个函数。如果可选值存在,那么它会被传递给该函数。否则,不会发生任何事情。
Java
optionalValue.ifPresent(v -> Process v);如果想要在可选值存在时执行一种动作,在可选值不存在时执行另一种动作,可以使用 ifPresentOrElse:
Java
optionalValue.ifPresentOrElse(
v -> System.out.println("Found " + v),
() -> logger.warning("No match"));1
2
3
2
3
7.3. map 和 flatMap 方法
通过 map 方法可以产生一个 Optional,如果当前的 Optional 的值存在,那么所产生的 Optional 的值是通过将给定的函数应用于当前的 Optional 的值而得到的;否则,产生一个空的 Optional。
Java
Optional<String> transformed = optionalString.map(String::toUpperCase);flatMap 方法与 map 方法类似,但它要求传入的函数返回一个 Optional 对象而不是直接返回一个值。这样,flatMap 会自动展平嵌套的 Optional 结构,将最终的结果包装在一个单层的 Optional 对象中:
Java
public static Optional<Double> inverse(Double x) {
return x == 0 ? Optional.empty() : Optional.of(1 / x);
}
public static Optional<Double> squareRoot(Double x) {
return x < 0 ? Optional.empty() : Optional.of(Math.sqrt(x));
}
Optional<Double> result = Optional.of(-4.0).flatMap(Demo::inverse).flatMap(Demo::squareRoot);1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
7.4. filter 和 or 方法
通过 filter 方法可以产生一个 Optional,如果当前的 Optional 的值满足给定的谓词条件,那么所产生的 Optional 的值就是当前 Optional 的值;否则,产生一个空 Optional。
Java
Optional<String> transformed = optionalString.filter(s -> s.length() >= 8).map(String::toUpperCase);用 or 方法将空 Optional 替换为一个可替代的 Optional,这个可替代值将以惰性方式计算:
Java
Optional<String> result = optionalString.or(() -> alternatives.stream().findFirst());7.5. 创建 Optional
可以使用 Optional.of(obj)、Optional.ofNullable(obj) 或 Optional.empty() 来创建一个 Optional 对象。其中 Optional.ofNullable(obj) 会在 obj 不为 null 的情况下返回 Optional.of(obj),否则会返回 Optional.empty()。
7.6. 将 Optional 转换为流
stream 方法会将一个 Optional<T> 对象转换为一个具有 0 个或 1 个元素的 Stream<T> 对象。
考虑下面这段代码:
Java
Stream<String> ids = ...;
Stream<User> users = ids.map(Users::lookup)
.filter(Optional::isPresent)
.map(Optional::get);1
2
3
4
2
3
4
我们可以优雅地将其改写成:
Java
Stream<User> users = ids.map(Users::lookup)
.flatMap(Optional::stream);1
2
2
Note
假设
Users.classicLookup(id)返回一个User对象或null值,而不是Optional<User>。当然,我们可以简单地使用filter方法来过滤掉null值:JavaStream<User> users = ids.map(Users::classicLookup).filter(Objects::nonNull);但是,如果你更习惯用
flatMap方法,那么你可以写成下面这样:JavaStream<User> users = ids.flatMap(id -> Stream.ofNullable(Users.classicLookup(id)));或者这样:
JavaStream<User> users = ids.map(Users::classicLookup).flatMap(Stream::ofNullable);
8. 收集结果
当处理完流之后,通常会想要查看其结果。此时可以调用 iterator 方法,它会产生用来访问元素的旧式风格的迭代器。或者,可以调用 forEach 方法,将某个函数应用于每个元素:
Java
stream.forEach(System.out::println);在并行流上,forEach 方法会以任意顺序遍历各个元素。如果想要按照流中的顺序来处理它们,可以调用 forEachOrdered 方法。当然,这个方法会丧失并行处理的部分甚至全部优势。
但是,更常见的情况是,我们想要将结果收集到数据结构中。此时,可以调用 toArray,获得由流的元素构成的数组。因为无法在运行时创建泛型数组,所以表达式 stream.toArray() 会返回一个 Object[] 数组。如果想要让数组具有正确的类型,可以将其传递到数组构造器中:
Java
String[] result = stream.toArray(String[]::new); // stream.toArray() has type Object[]针对将流中的元素收集到另一个目标中,有一个便捷方法 collect 可用,它会接受一个 Collector 接口的实例。收集器是一种收集众多元素并产生单一结果的对象,Collectors 类提供了大量用于生成常见收集器的工厂方法:
Java
List<String> result1 = stream.collect(Collectors.toList());
Set<String> result2 = stream.collect(Collectors.toSet());
TreeSet<String> result3 = stream.collect(Collectors.toCollection(TreeSet::new));
String result4 = stream.collect(Collectors.joining());
String result5 = stream.collect(Collectors.joining(", "));
String result6 = stream.map(Object::toString).collect(Collectors.joining(", "));1
2
3
4
5
6
2
3
4
5
6
如果想要将流的结果约简为总和、数量、平均值、最大值或最小值,可以使用 summarizing(Int|Long|Double) 方法中的某一个。这些方法会接受一个将流对象映射为数值的函数,产生类型为 (Int|Long|Double)SumaryStatistics 的结果,同时计算总和、数量、平均值、最大值和最小值。
Java
IntSummaryStatistics summary = stream.collect(Collectors.summarizingInt(String::length));
double averageWordLength = summary.getAverage();
double maxWordLength = summary.getMax();1
2
3
2
3
9. 收集到映射中
假设我们有一个 Stream<Person>,并且想要将其元素收集到一个映射中,这样后续就可以通过它们的 ID 来查找人员了。Collectors.toMap 方法有两个函数参数,它们用来产生映射的键和值。例如:
Java
Map<Integer, Person> idToPerson = people.collect(Collectors.toMap(Person::getId, Function.identity()));Note
Function.identity方法返回一个总是直接将输入参数返回的函数:Javastatic <T> Function<T, T> identity() { return t -> t; }
还可以通过提供第 3 个函数参数来解决已有值和新值的冲突(多个元素具有相同的键),并决定键对应的值。这个函数应该返回已有值、新值或它们的组合:
Java
Stream<Locale> locales = Stream.of(Locale.getAvailableLocales());
Map<String, String> languageNames = locales.collect(
Collectors.toMap(
Locale::getDisplayLanguage,
loc -> loc.getDisplayLanguage(loc),
(existingValue, newValue) -> existingValue));1
2
3
4
5
6
2
3
4
5
6
我们不关心同一种语言是否可能会重复出现(例如,德国和瑞士都使用德语),因此我们只记录第一项。
如果想要得到 TreeMap,那么可以将构造器作为第 4 个参数来提供。你必须提供一种合并函数:
Java
Map<Integer, Person> idToPerson = people.collect(
Collectors.toMap(
Person::getId,
Function.identity(),
(existingValue, newValue) -> { throw new IllegalStateException(); },
TreeMap::new));1
2
3
4
5
6
2
3
4
5
6
10. 群组和分区
将具有相同特性的值群聚成组是非常常见的,并且 groupingBy 方法直接就支持它。
Java
Map<String, List<Locale>> countryToLocales = locales.collect(Collectors.groupingBy(Locale::getCountry));函数 Locale::getCountry 是群组的分类函数,你现在可以查找给定国家代码对应的所有地点了,例如:
Java
List<Locale> swissLocales = countryToLocales.get("CH"); // Yields locales de_CH, fr_CH, it_CH and maybe more当分类函数是断言函数(即返回 boolean 值的函数)时,流的元素可以分为两个列表:该函数返回 true 的元素和其他的元素。在这种情况下,使用 partitioningBy 比使用 groupingBy 更高效。例如,在下面的代码中,我们将所有 local 分成了使用英语和使用所有其他语言的两类:
Java
Map<Boolean, List<Locale>> englishAndOtherLocales = locales.collect(Collectors.partitioningBy(l -> l.getLanguage().equals("en")));
List<Locale> englishLocales = englishAndOtherLocales.get(true);1
2
2
11. 下游收集器
groupingBy 方法会产生一个映射表,它的每个值都是一个列表。如果想要获得集而不是列表,就需要提供一个 “下游收集器”,可以使用上一节中看到的 Collectors.toSet 收集器:
Java
Map<String, Set<Locale>> countryToLocaleSet = locales.collect(groupingBy(Locale::getCountry, toSet()));Java 提供了多种可以将收集到的元素约简为数字的收集器:
Collectors.counting计算收集到的元素的个数:JavaMap<String, Long> countryToLocaleCounts = locales.collect(groupingBy(Locale::getCountry, counting()));summing(Int|Long|Double)会接受一个函数作为参数,将该函数应用到下游元素中,并产生它们的和:JavaMap<String, Integer> stateToCityPopulation = cities.collect(groupingBy(City::getState, summingInt(City::getPopulation)));maxBy和minBy会接受一个比较器,并分别产生下游元素中的最大值和最小值:JavaMap<String, Optional<City>> stateToLargestCity = cities.collect(groupingBy(City::getState, maxBy(Comparator.comparing(City::getPopulation))));
collectingAndThen 收集器在收集器后面添加了一个最终处理步骤。例如,如果我们想要知道有多少不同的结果,那么就可以将它们收集到一个集中,然后计算其尺寸:
Java
Map<Character, Integer> stringCountsByStartingLetter = strings.collect(groupingBy(s -> s.charAt(0), collectingAndThen(toSet(), Set::size)));mapping 收集器的做法正好相反,它会将一个函数应用于收集到的每个元素,并将结果传递给下游收集器。
Java
Map<Character, Set<Integer>> stringLengthsByStartingLetter = strings.collect(groupingBy(s -> s.charAt(0), mapping(String::length, toSet())));对于下面这段代码:
Java
Map<String, Set<String>> countryLanguageSets = locales.collect(
Collectors.toMap(
Locale::getDisplayCountry,
l -> Collections.singleton(l.getDisplayLanguage()),
(a, b) -> { // Union of a and b
var union = new HashSet<String>(a);
union.addAll(b);
return union;
}
)
);1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
通过 mapping 我们可以将其优化为:
Java
Map<String, Set<String>> countryToLanguages = locales.collect(
groupingBy(
Locale::getDisplayCountry,
mapping(Locale::getDisplayLanguage, toSet())
)
);1
2
3
4
5
6
2
3
4
5
6
还有一个 flatMapping 方法,可以与返回流的函数一起使用。
如果群组和映射函数的返回值为 int、long 或 double, 那么可以将元素收集到汇总统计对象中,例如 Collectors.summarizingInt:
Java
Map<String, IntSummaryStatistics> stateToCityPopulationSummary = cities.collect(groupingBy(City::getState, summarizingInt(City::getPopulation)));filtering 收集器会将一个过滤器应用到每个组上,例如:
Java
Map<String, Set<City>> largeCitiesByState = cities.collect(
groupingBy(
City::getState,
filtering(c -> c.getPopulation() > 500000,
toSet())
)
); // States without large cities have empty sets1
2
3
4
5
6
7
2
3
4
5
6
7
将收集器组合起来是一种很强大的方式,但是它也可能会导致产生非常复杂的表达式。最佳用法是与 groupingBy 和 partitioningBy 一起处理 “下游的” 映射表中的值。否则,应该直接在流上应用诸如 map、reduce、count、max 或 min 这样的方法。
12. 约简操作
reduce 方法是一种用于从流中计算某个值的通用机制,其最简单的形式将接受一个二元函数,并从前两个元素开始持续应用它。如果该函数是求和函数,那么就很容易解释这种机制:
Java
List<Integer> values = ...;
Optional<Integer> sum = values.stream().reduce((x, y) -> x + y);1
2
2
在上面的情况中,reduce 方法会计算 Optional,因为没有任何有效的结果。
如果要用并行流来约简,那么这项约简操作必须是可结合的,即组合元素时使用的顺序不会产生任何影响。在数学标记法中,
通常,会有一个幺元值 0 是加法的幺元值。由此,我们可以使用第 2 种形式的 reduce:
Java
List<Integer> values = ...;
Integer sum = values.stream().reduce(0, (x, y) -> x + y); // Computes 0 + v0 + v1 + v2 + ...1
2
2
如果流为空,则会返回幺元值,你就再也不需要处理 Optional 类了。
13. 基本类型流
到目前为止,我们都是将整数收集到 Stream<Integer> 中,尽管很明显,但是将每个整数都包装到包装器对象中却是很低效的。对其他基本类型来说,情况也是一样,这些基本类型是 double、float、long、short、char、byte 和 boolean。流库中具有专门的类型 IntStream、LongStream 和 DoubleStream 用来直接存储基本类型值,而无须使用包装器。如果想要存储 short、char、byte 和 boolean,可以使用 IntStream,而对于 float,可以使用 DoubleStream。
为了创建 IntStream,需要调用 IntStream.of 和 Arrays.stream 方法:
Java
IntStream stream = IntStream.of(1, 1, 2, 3, 5);
stream = Arrays.stream(values, from, to); // values is an int[] array1
2
2
与对象流一样,我们还可以使用静态的 generate 和 iterate 方法。此外,IntStream 和 LongStream 有静态方法 range 和 rangeClosed,可以生成步长为 1 的整数范围:
Java
IntStream zeroToNinetyNine = IntStream.range(0, 100); // Upper bound is excluded
IntStream zeroToHundred = IntStream.rangeClosed(0, 100); // Upper bound is included1
2
2
CharSequence 接口拥有 codePoints 和 chars 方法,可以生成由字符的 Unicode 码或由 UTF-16 编码机制的码元构成的 IntStream:
Java
String sentence = "\uD835\uDD46 is the set of octonions."; // \uD835\uDD46 is the UTF-16 encoding of the letter 𝕆, unicode U+1D546
IntStream codes = sentence.codePoints(); // The stream with hex values 1D546 20 69 73 20 ...1
2
2
当你有一个对象流时,可以用 mapToInt、mapToLong 或 mapToDouble 将其转换为基本类型流。例如,如果你有一个字符串流,并想将其长度处理为整数,那么就可以在 IntStream 中实现此目的:
Java
Stream<String> words = ...;
IntStream lengths = words.mapToInt(String::length);1
2
2
为了将基本类型流转换为对象流,需要使用 boxed 方法:
Java
Stream<Integer> integers = IntStream.range(0, 100).boxed();通常,基本类型流上的方法与对象流上的方法类似。下面是主要的差异:
toArray方法会返回基本类型数组;- 产生可选结果的方法会返回一个
OptionalInt、OptionalLong或OptionalDouble。这些类与Optional类类似,但是具有getAsInt、getAsLong和getAsDouble方法,而不是get方法; - 具有分别返回总和、平均值、最大值和最小值的
sum、average、max和min方法。对象流没有定义这些方法; summaryStatistics方法会产生一个类型为IntSummaryStatistics、LongSummaryStatistics或DoubleSummaryStatistics的对象,它们可以同时报告流的总和、数量、平均值、最大值和最小值;
Note:
Random类具有ints、longs和doubles方法,它们会返回由随机数构成的基本类型流。如果需要的是并行流中的随机数,那么需要使用SplittableRandom类。
14. 并行流
流使并行处理块操作变得很容易。这个过程几乎是自动的,但是需要遵守一些规则。首先,必须有一个并行流。可以用 Collection.parallelStream() 方法从任何集合中获取一个并行流:
Java
Stream<String> parallelWords = words.parallelStream();而且,parallel 方法可以将任意的顺序流转换为并行流。
Java
Stream<String> parallelWords = Stream.of(wordArray).parallel();只要在终结方法执行时流处于并行模式,所有的中间流操作就都将被并行化。
假设你想要对字符串流中的所有短单词计数,如果用长度将字符串分组,然后分别对它们进行计数,那么就可以安全地并行化这项计算:
Java
Map<Integer, Long> shortWordCounts = words.parallelStream()
.filter(s -> s.length() < 12)
.collect(groupingBy(String::length, counting()));1
2
3
2
3
默认情况下,从有序集合(数组和列表)、范围、生成器和迭代器产生的流,或者通过调用 Stream.sorted 产生的流,都是有序的。它们的结果是按照原来元素的顺序累积的,因此是完全可预知的。如果运行相同的操作两次,将会得到完全相同的结果。
排序并不排斥高效的并行处理。例如,当计算 stream.map(fun) 时,流可以被划分为 n 部分,它们会被并行地外理。然后,结果将会按照顺序重新组装起来。
当放弃排序需求时,有些操作可以被更有效地并行化。通过在流上调用 Stream.unordered 方法,就可以明确表示我们对排序不感兴趣。Stream.distinct 就是从这种方式中获益的一种操作。在有序的流中,distinct 会保留所有相同元素中的第一个,这对并行化是一种阻碍,因为处理每个部分的线程在其之前的所有部分都被处理完之前,并不知道应该丢弃哪些元素。如果可以接受保留唯一元素中任意一个的做法,那么所有部分就可以并行地处理(使用共享的集合来跟踪重复元素)。
还可以通过放弃排序要求来提高 limit 方法的速度。如果只想从流中取出任意 n 个元素,而并不在意到底要获取哪些,那么可以调用:
Java
Stream<String> sample = words.parallelStream().unordered().limit(n);合并映射表的代价很高昂。正是这个原因,Collectors.groupingByConcurrent 方法使用了共享的并发映射表。为了从并行化中获益,映射表中值的顺序不会与流中的顺序相同。
Java
Map<Integer, List<String>> result = words.parallelStream().collect(groupingByConcurrent(String::length)); // Values aren’t collected in stream order当然,如果使用独立于排序的下游收集器,那么就不必在意了,例如:
Java
Map<Integer, Long> wordCounts = words.parallelStream().collect(groupingByConcurrent(String::length, counting()));不要指望通过将所有的流都转换为并行流就能够加速操作,要牢记下面几条:
- 并行化会导致大量的开销,只有面对非常大的数据集才划算;
- 只有在底层的数据源可以被有效地分割为多个部分时,将流并行化才有意义;
- 并行流使用的线程池可能会因诸如文件 I/O 或网络访问这样的操作被阻塞而饿死;
Note 1:在 Java 9 之前,对
Files.lines方法返回的流进行并行化是没有意义的。因为数据是不可分割的,所以我们只能在读取文件的后半部分之前读取前半部分。现在,该方法使用的是内存映射文件,因此可以有效地进行分割。如果想要处理一个大型文件的各个行,并行化这个流可能会提高性能。
Note 2
默认情况下,并行流使用的是
ForkJoinPool.commonPool返回的全局fork-join池。只有在操作不会阻塞并且我们不会将这个池与其他任务共享的情况下,这种方式才不会有什么问题。有一种解决方法是使用另一个不同的池,即把操作放置到定制的池的submit方法中:JavaForkJoinPool customPool = ...; result = customPool.submit(() -> stream.parallel().map(...).collect(...)).get();1
2或者,使用异步方式:
JavaCompletableFuture.supplyAsync(() -> stream.parallel().map(. . .).collect(. . .), customPool).thenAccept(result -> . . .);