一、概述
Java 的 Streams API 是一种功能强大且用途广泛的数据处理工具。 根据定义,流式操作是对一组数据的单次迭代。
但是,有时我们希望以不同的方式处理流的某些部分并获得不止一组结果。
在本教程中,我们将学习如何将流拆分为多个组并独立处理它们。
2. 使用收集器
一个 Stream 应该作一次并且有一个终端操作。 它可以有多个中间操作,但数据只能在关闭之前收集一次。
这意味着 Streams API 规范明确禁止分叉流并为每个分叉提供不同的中间操作。 这将导致多个终端操作。 但是,我们可以在终端操作里面拆分流。 这会创建一个分为两个或更多组的结果。
2.1。 使用进行二进制拆分partitioningBy
如果我们想将一个流一分为二,我们可以使用类中的 。 它接受一个并返回一个 ,它将满足 键下的谓词和下的其余元素分组。Collectors
partitioningBy
Predicate
Map
Boolean
true
false
假设我们有一个文章列表,其中包含有关它们应该发布到的目标站点以及它们是否应该被推荐的信息。
List<Article> articles = Lists.newArrayList(
new Article("Baeldung", true),
new Article("Baeldung", false),
new Article("Programming Daily", false),
new Article("The Code", false));
我们将它分为两组,一组仅包含 Baeldung 文章,第二组包含其余文章:
Map<Boolean, List<Article>> groupedArticles = articles.stream()
.collect(Collectors.partitioningBy(a -> a.target.equals("Baeldung")));
我们看看地图中的键下分别归档哪些文章:true
false
assertThat(groupedArticles.get(true)).containsExactly(
new Article("Baeldung", true),
new Article("Baeldung", false));
assertThat(groupedArticles.get(false)).containsExactly(
new Article("Programming Daily", false),
new Article("The Code", false));
2.2.使用拆分groupingBy
如果我们想要更多的类别,那么我们需要使用方法。 它需要一个函数,将每个元素分类到一个组中。 然后它返回一个将每个组分类器链接到其元素集合的 。groupingBy
Map
假设我们要按目标站点对文章进行分组。 返回的将具有包含站点名称的键和包含与给定站点关联的文章集合的值:Map
Map<String, List<Article>> groupedArticles = articles.stream()
.collect(Collectors.groupingBy(a -> a.target));
assertThat(groupedArticles.get("Baeldung")).containsExactly(
new Article("Baeldung", true),
new Article("Baeldung", false));
assertThat(groupedArticles.get("Programming Daily")).containsExactly(new Article("Programming Daily", false));
assertThat(groupedArticles.get("The Code")).containsExactly(new Article("The Code", false));
3. 使用teeing
从 Java 12 开始,我们为二进制拆分提供了另一种选择。 我们可以使用收集器。 teeing
将两个收集器组合成一个复合材料。 每个元素都由它们处理,然后使用提供的合并合并为单个返回值。teeing
3.1。 与Predicate
teeing
teeing
收集器与Collectors
类中的另一个收集器很好地配对,称为filtering
。 它接受一个谓词并使用它来过滤处理过的元素,然后将它们传递给另一个收集器。
让我们将文章分为 Baeldung 和非 Baeldung 的组并计算它们。 我们还将使用构造函数作为合并函数:List
List<Long> countedArticles = articles.stream().collect(Collectors.teeing(
Collectors.filtering(article -> article.target.equals("Baeldung"), Collectors.counting()),
Collectors.filtering(article -> !article.target.equals("Baeldung"), Collectors.counting()),
List::of));
assertThat(countedArticles.get(0)).isEqualTo(2);
assertThat(countedArticles.get(1)).isEqualTo(2);
3.2.处理结果eeing
此解决方案与以前的解决方案之间存在一个重要区别。 我们之前创建的组没有重叠,源流中的每个元素最多属于一个组。 使用我们不再受此限制的约束,因为每个收集器都可能处理整个流。 让我们看看如何利用它。teeing,
我们可能希望将文章分为两组,一组仅包含特色文章,第二组仅包含 Baeldung 文章。 生成的文章集可能会重叠,因为一篇文章可以同时成为 Baeldung 的特色和目标。
这次我们将它们收集到列表中,而不是计数:
List<List<Article>> groupedArticles = articles.stream().collect(Collectors.teeing(
Collectors.filtering(article -> article.target.equals("Baeldung"), Collectors.toList()),
Collectors.filtering(article -> article.featured, Collectors.toList()),
List::of));
assertThat(groupedArticles.get(0)).hasSize(2);
assertThat(groupedArticles.get(1)).hasSize(1);
assertThat(groupedArticles.get(0)).containsExactly(
new Article("Baeldung", true),
new Article("Baeldung", false));
assertThat(groupedArticles.get(1)).containsExactly(new Article("Baeldung", true));
4. 使用 RxJava
虽然 Java 的 Streams API 是一个有用的工具,但有时它还不够。 其他解决方案,例如 RxJava 提供的响应式流,可能能够帮助我们。 让我们看一个简短的示例,说明如何使用和多个来实现与示例相同的结果。Observable
Subscribers
Stream
4.1。 创建一个Observable
首先,我们需要从文章列表中创建一个Observable
实例。 我们可以使用类的 factory 方法:Observable
from
Observable<Article> observableArticles = Observable.from(articles);
4.2.过滤Observables
接下来,我们需要创建将过滤文章的 。 为此,我们将使用类中的方法:Observables
Observable
filter
Observable<Article> baeldungObservable = observableArticles.filter(
article -> article.target.equals("Baeldung"));
Observable<Article> featuredObservable = observableArticles.filter(
article -> article.featured);
4.3.创建多个Subscribers
最后,我们需要订阅Observables
并提供一个Action
来描述我们想要对文章做什么。 一个真实的例子是将它们保存在数据库中或将它们发送给客户端,但我们将满足于将它们添加到列表中:
List<Article> baeldungArticles = new ArrayList<>();
List<Article> featuredArticles = new ArrayList<>();
baeldungObservable.subscribe(baeldungArticles::add);
featuredObservable.subscribe(featuredArticles::add);
5.结论
在本教程中,我们学习了如何将流拆分为组并分别处理它们。 首先,我们查看了较旧的 Streams API 方法: 和 。 接下来,我们使用了一种更新的方法,利用了 Java 12 中引入的方法。 最后,我们研究了如何使用 RxJava 来获得具有更大弹性的类似结果。groupingBy
partitionBy
teeing
0 评论