拨开荷叶行,寻梦已然成。仙女莲花里,翩翩白鹭情。
IMG-LOGO
主页 文章列表 Flux.create 和Flux.generate 之间的区别

Flux.create 和Flux.generate 之间的区别

白鹭 - 2022-06-25 2446 0 2

一、简介

Project Reactor 为JVM 提供了一个完全非阻塞的编程基础。它提供了Reactive Streams 规范的实现,并提供了可组合的异步API,例如Flux。Flux 是具有多个反应式运算符的反应式流发布者。它发出0 到N 个元素,然后成功或错误地完成。它可以根据我们的需要以几种不同的方式创建。

2. 理解通量

Flux 是一个Reactive Stream 发布者,可以发出0 到N 个元素它有几个运算符用于生成、编排和转换Flux 序列。Flux 可以成功完成,也可以完成但有错误。

Flux API 在Flux 上提供了几个静态工厂方法来创建源或从多个回调类型生成。它还提供实例方法和运算符来构建异步处理管道。该管道产生一个异步序列。

在接下来的部分中,让我们看看Fluxgenerate()create()方法的一些用法。

3. Maven依赖

我们需要[reactor-core](https://search.maven.org/search?q=g:io.projectreactor%20AND%20a:reactor-core&core=gav)reactor-testMaven 依赖项:

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.17</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.17</version>
<scope>test</scope>
</dependency>

4.通量生成

Flux API 的generate()方法提供了一种简单直接的编程方法来创建Flux。generate()方法采用生成器函数来生成一系列项目。

生成方法有三种变体:

  • generate(Consumer<SynchronousSink<T>> generator)

  • generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)

  • generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)

generate 方法根据需要计算并发出值最好在计算下游可能不使用的元素的成本太高的情况下使用。如果发出的事件受应用程序状态的影响,也可以使用它。

4.1。例子

在这个例子中,让我们使用generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)来生成一个Flux

public class CharacterGenerator {
public Flux<Character> generateCharacters() {
return Flux.generate(() -> 97, (state, sink) -> {
char value = (char) state.intValue();
sink.next(value);
if (value == 'z') {
sink.complete();
}
return state + 1;
});
}
}

generate()方法中,我们提供了两个函数作为参数:

  • 第一个是Callable函数。此函数定义生成器的初始状态,值为97

  • 第二个是BiFunction.这是一个使用SynchronousSink.每当调用接收器的next方法时,此SynchronousSink 都会返回一个项目

根据其名称,SynchronousSink实例同步工作。但是,我们不能在每个生成器调用中多次调用SynchronousSink对象的next方法。

让我们使用StepVerifier验证生成的序列:

@Test
public void whenGeneratingCharacters_thenCharactersAreProduced() {
CharacterGenerator characterGenerator = new CharacterGenerator();
Flux<Character> characterFlux = characterGenerator.generateCharacters().take(3);
StepVerifier.create(characterFlux)
.expectNext('a', 'b', 'c')
.expectComplete()
.verify();
}

在此示例中,订阅者仅请求三个项目。因此,生成的序列以发出三个字符——a、b 和c 结束。expectNext()期望从Flux 中得到我们期望的元素。expectComplete() 表示从Flux 发射元素的完成。

5.通量创建

当我们想要create()**不受应用程序状态影响的多个(0 到无穷大)值**时,使用Flux 中的create() 方法。这是因为Fluxcreate()方法的底层方法不断计算元素。

此外,下游系统决定了它需要多少元素。因此,如果下游系统无法跟上,已经发出的元素要么被缓冲要么被移除。

默认情况下,发出的元素会被缓冲,直到下游系统请求更多元素。

5.1。例子

现在让我们演示create()方法的示例:

public class CharacterCreator {
public Consumer<List<Character>> consumer;
public Flux<Character> createCharacterSequence() {
return Flux.create(sink -> CharacterCreator.this.consumer = items -> items.forEach(sink::next));
}
}

我们可以注意到create运算符要求我们使用FluxSink而不是generate() 中使用的SynchronousSink。在这种情况下,我们将为items列表中的每个项目调用next(),逐个发出。

现在让我们使用带有两个字符序列的CharacterCreator

@Test
public void whenCreatingCharactersWithMultipleThreads_thenSequenceIsProducedAsynchronously() throws InterruptedException {
CharacterGenerator characterGenerator = new CharacterGenerator();
List<Character> sequence1 = characterGenerator.generateCharacters().take(3).collectList().block();
List<Character> sequence2 = characterGenerator.generateCharacters().take(2).collectList().block();
}

我们在上面的代码片段中创建了两个序列——sequence1sequence2。这些序列用作字符项的来源。请注意,我们使用CharacterGenerator实例来获取字符序列。

现在让我们定义一个characterCreator实例和两个线程实例:

CharacterCreator characterCreator = new CharacterCreator();
Thread producerThread1 = new Thread(() -> characterCreator.consumer.accept(sequence1));
Thread producerThread2 = new Thread(() -> characterCreator.consumer.accept(sequence2));

我们现在正在创建两个线程实例,它们将为发布者提供元素。当调用接受运算符时,字符元素开始流入序列源。接下来,我们subscribe新的合并序列:

List<Character> consolidated = new ArrayList<>();
characterCreator.createCharacterSequence().subscribe(consolidated::add);

请注意,createCharacterSequence返回一个我们订阅的Flux 并使用consolidated列表中的元素。接下来,让我们触发查看项目在两个不同线程上移动的整个过程:

producerThread1.start();
producerThread2.start();
producerThread1.join();
producerThread2.join();

最后,让我们验证一下操作的结果:

assertThat(consolidated).containsExactlyInAnyOrder('a', 'b', 'c', 'a', 'b');

接收到的序列中的前三个字符来自sequence1.最后两个字符来自sequence2。由于这是一个异步操作,因此无法保证这些序列中元素的顺序。

6. Flux Create vs. Flux Generate

以下是create 和generate 操作之间的一些区别:

通量创建通量生成
此方法接受Consumer<FluxSink>的实例此方法接受Consumer<SynchronousSink>的实例
Create 方法只调用消费者一次生成方法根据下游应用的需要多次调用消费者方法
消费者可以立即发出0..N 个元素只能发射一种元素
发布者不知道下游状态。因此create 接受溢出策略作为流量控制的附加参数发布者根据下游应用需求生成元素
FluxSink允许我们在需要时使用多个线程发出元素对多线程没有用,因为它一次只发出一个元素

7. 结论

在本文中,我们讨论了Flux API 的create 和generate 方法之间的区别。

首先,我们介绍了反应式编程的概念并讨论了Flux API。然后我们讨论了Flux API 的create 和generate 方法。最后,我们提供了Flux API 的create 和generate 方法之间的差异列表。


标签:

0 评论

发表评论

您的电子邮件地址不会被公开。 必填的字段已做标记 *