一、简介
Guava 为我们提供了ListenableFuture
其中包含一个基于默认 Java Future.
让我们看看我们如何利用它来发挥我们的优势。
2. Future
, ListenableFuture
和Futures
让我们简要地看看这些不同的类是什么以及它们是如何相互关联的。
2.1. Future
从Java 5,
我们可以使用java.util.concurrent.Future
来表示异步任务。
Future
允许我们访问已经完成或将来可能完成的任务的结果,并支持取消它们。
2.2. ListenableFuture
java.util.concurrent.Future
时缺少的一个功能是添加侦听器以在完成时运行,这是大多数流行的异步框架提供的常见功能。
Guava 通过允许我们将侦听器附加到它的[com.google.common.util.concurrent.ListenableFuture](https://guava.dev/releases/29.0-jre/api/docs/com/google/common/util/concurrent/ListenableFuture.html "可听未来") .
2.3. Futures
Guava 为我们提供了便利类com.google.common.util.concurrent.Futures
以使其更容易使用其ListenableFuture.
该类提供了多种与ListenableFuture,
交互的方式,其中包括支持添加成功/失败回调,并允许我们通过聚合或转换来协调多个期货。
3. 简单使用
现在让我们看看如何以最简单的方式ListenableFuture
创建和添加回调。
3.1.创造ListenableFuture
**ListenableFuture
的最简单方法是向ListeningExecutorService
**提交任务(很像我们如何使用普通的ExecutorService
来获取普通的Future
):
ExecutorService execService = Executors.newSingleThreadExecutor();
ListeningExecutorService lExecService = MoreExecutors.listeningDecorator(execService);
ListenableFuture<Integer> asyncTask = lExecService.submit(() -> {
TimeUnit.MILLISECONDS.sleep(500); // long running task
return 5;
});
请注意我们如何使用MoreExecutors
类将ExecutorService
装饰为ListeningExecutorService.
我们可以参考线程池在 Guava 中的实现来了解更多关于MoreExecutors
。
如果我们已经有一个返回Future
的 API 并且我们需要将它转换为ListenableFuture
,这很容易完成 通过初始化它的具体实现ListenableFutureTask:
// old api
public FutureTask<String> fetchConfigTask(String configKey) {
return new FutureTask<>(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
});
}
// new api
public ListenableFutureTask<String> fetchConfigListenableTask(String configKey) {
return ListenableFutureTask.create(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
});
}
我们需要注意,除非我们将它们提交给Executor.
直接与ListenableFutureTask
交互不是常见用法,仅在极少数情况下完成(例如:实现我们自己的ExecutorService
)。实际使用请参考 Guava 的[AbstractListeningExecutorService](https://github.com/google/guava/blob/v18.0/guava/src/com/google/common/util/concurrent/AbstractListeningExecutorService.java "AbstractListeningExecutorService")
如果我们的异步任务无法使用ListeningExecutorService
或提供的Futures
实用程序方法,我们也可以使用com.google.common.util.concurrent.SettableFuture
,并且我们需要手动设置未来值。对于更复杂的用法,我们还可以考虑com.google.common.util.concurrent.AbstractFuture.
3.2.添加侦听器/回调
我们可以**ListenableFuture
添加侦听器的一种Futures.addCallback(),
注册回调,使我们可以在成功或失败发生时访问结果或异常:**
Executor listeningExecutor = Executors.newSingleThreadExecutor();
ListenableFuture<Integer> asyncTask = new ListenableFutureService().succeedingTask()
Futures.addCallback(asyncTask, new FutureCallback<Integer>() {
@Override
public void onSuccess(Integer result) {
// do on success
}
@Override
public void onFailure(Throwable t) {
// do on failure
}
}, listeningExecutor);
我们还**可以通过直接将侦听器添加到ListenableFuture.
**请注意,此侦听器将在未来成功或异常完成时运行。另请注意,我们无权访问异步任务的结果:
Executor listeningExecutor = Executors.newSingleThreadExecutor();
int nextTask = 1;
Set<Integer> runningTasks = ConcurrentHashMap.newKeySet();
runningTasks.add(nextTask);
ListenableFuture<Integer> asyncTask = new ListenableFutureService().succeedingTask()
asyncTask.addListener(() -> runningTasks.remove(nextTask), listeningExecutor);
4. 复杂的用法
现在让我们看看如何在更复杂的场景中使用这些期货。
4.1.扇入
我们有时可能需要调用多个异步任务并收集它们的结果,通常称为扇入操作。
Guava 为我们提供了两种方法。但是,我们应该谨慎地根据我们的要求选择正确的方法。假设我们需要协调以下异步任务:
ListenableFuture<String> task1 = service.fetchConfig("config.0");
ListenableFuture<String> task2 = service.fetchConfig("config.1");
ListenableFuture<String> task3 = service.fetchConfig("config.2");
扇入多个期货的一种方法是使用Futures.allAsList()
方法。如果所有期货都成功,这允许我们按照提供的期货的顺序收集所有期货的结果。如果这些 future 中的任何一个失败,那么整个结果就是一个失败的 future:
ListenableFuture<List<String>> configsTask = Futures.allAsList(task1, task2, task3);
Futures.addCallback(configsTask, new FutureCallback<List<String>>() {
@Override
public void onSuccess(@Nullable List<String> configResults) {
// do on all futures success
}
@Override
public void onFailure(Throwable t) {
// handle on at least one failure
}
}, someExecutor);
如果我们需要收集所有异步任务的结果,无论它们是否失败,我们都可以使用Futures.successfulAsList()
。这将返回一个列表,其结果将与传递给参数的任务具有相同的顺序,失败的任务将在列表中的各自位置分配null
ListenableFuture<List<String>> configsTask = Futures.successfulAsList(task1, task2, task3);
Futures.addCallback(configsTask, new FutureCallback<List<String>>() {
@Override
public void onSuccess(@Nullable List<String> configResults) {
// handle results. If task2 failed, then configResults.get(1) == null
}
@Override
public void onFailure(Throwable t) {
// handle failure
}
}, listeningExecutor);
在上面的用法中我们应该小心,如果未来的任务通常null
,它将与失败的任务(也将结果设置为null
)无法区分。
4.2.用组合器扇入
如果我们需要协调多个返回不同结果的期货,上面的解决方案可能还不够。在这种情况下,我们可以使用扇入操作的组合器变体来协调这种期货组合。
与简单的扇入操作类似, Guava 为我们提供了两种变体;一种在所有任务成功完成时成功,一种在使用Futures.whenAllSucceed()
和Futures.whenAllComplete()
方法时即使某些任务失败也成功。
让我们看看如何使用Futures.whenAllSucceed()
组合来自多个期货的不同结果类型:
ListenableFuture<Integer> cartIdTask = service.getCartId();
ListenableFuture<String> customerNameTask = service.getCustomerName();
ListenableFuture<List<String>> cartItemsTask = service.getCartItems();
ListenableFuture<CartInfo> cartInfoTask = Futures.whenAllSucceed(cartIdTask, customerNameTask, cartItemsTask)
.call(() -> {
int cartId = Futures.getDone(cartIdTask);
String customerName = Futures.getDone(customerNameTask);
List<String> cartItems = Futures.getDone(cartItemsTask);
return new CartInfo(cartId, customerName, cartItems);
}, someExecutor);
Futures.addCallback(cartInfoTask, new FutureCallback<CartInfo>() {
@Override
public void onSuccess(@Nullable CartInfo result) {
//handle on all success and combination success
}
@Override
public void onFailure(Throwable t) {
//handle on either task fail or combination failed
}
}, listeningExecService);
如果我们需要允许某些任务失败,我们可以使用Futures.whenAllComplete()
。虽然语义与上面的基本相似,但我们应该注意,当对它们调用Futures.getDone()
ExecutionException
4.3.转型
有时我们需要转换成功后的结果。 Futures.transform()
和Futures.lazyTransform()
为我们提供了两种方法。
让我们看看如何使用Futures.transform()
来转换 Future 的结果。只要转换计算量不大,就可以使用它:
ListenableFuture<List<String>> cartItemsTask = service.getCartItems();
Function<List<String>, Integer> itemCountFunc = cartItems -> {
assertNotNull(cartItems);
return cartItems.size();
};
ListenableFuture<Integer> itemCountTask = Futures.transform(cartItemsTask, itemCountFunc, listenExecService);
**我们还可以使用Futures.lazyTransform()
**将转换函数应用于java.util.concurrent.Future.
我们需要记住,这个选项不返回一个ListenableFuture
而是一个普通的java.util.concurrent.Future
get()
在结果未来上调用时都会应用转换函数。
4.4.鍊式期货
我们可能会遇到我们的期货需要调用其他期货的情况。在这种情况下,Guava 为我们提供了async()
变体来安全地链接这些期货以一个接一个地执行。
让我们看看如何使用Futures.submitAsync()
从提交Callable
内部调用未来:
AsyncCallable<String> asyncConfigTask = () -> {
ListenableFuture<String> configTask = service.fetchConfig("config.a");
TimeUnit.MILLISECONDS.sleep(500); //some long running task
return configTask;
};
ListenableFuture<String> configTask = Futures.submitAsync(asyncConfigTask, executor);
如果我们想要真正的链接,其中一个 future 的结果被馈送到另一个 future 的计算中,我们可以使用Futures.transformAsync()
:
ListenableFuture<String> usernameTask = service.generateUsername("john");
AsyncFunction<String, String> passwordFunc = username -> {
ListenableFuture<String> generatePasswordTask = service.generatePassword(username);
TimeUnit.MILLISECONDS.sleep(500); // some long running task
return generatePasswordTask;
};
ListenableFuture<String> passwordTask = Futures.transformAsync(usernameTask, passwordFunc, executor);
Guava 还为我们提供了Futures.scheduleAsync()
和Futures.catchingAsync()
来分别提交计划任务和提供错误恢复的回退任务。虽然它们迎合不同的场景,但我们不会讨论它们,因为它们类似于其他async()
调用。
5. 使用注意事项
现在让我们研究一下在使用期货时可能遇到的一些常见陷阱以及如何避免它们。
5.1.工作与倾听执行者
在使用 Guava 期货时,了解工作执行者和监听执行者之间的区别很重要。例如,假设我们有一个异步任务来获取配置:
public ListenableFuture<String> fetchConfig(String configKey) {
return lExecService.submit(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
});
}
假设我们想为上述未来附加一个侦听器:
ListenableFuture<String> configsTask = service.fetchConfig("config.0");
Futures.addCallback(configsTask, someListener, listeningExecutor);
请注意, lExecService
是运行异步任务的执行器,而listeningExecutor
器的执行器。
如上所示,我们应该始终考虑将这两个执行程序分开,以避免出现我们的侦听器和工作线程争用相同线程池资源的情况。共享同一个 executor 可能会导致我们的繁重任务使 listener 执行饿死。或者一个写得很糟糕的重量级听众最终阻止了我们重要的重型任务。
5.2.小心使用directExecutor()
虽然我们可以MoreExecutors.directExecutor()
和MoreExecutors.newDirectExecutorService()
来更容易地处理异步执行,但在生产代码中我们应该小心使用它们。
当我们从上述方法中获取到 executor 时,我们提交给它的任何任务,无论是重量级的还是监听器,都会在当前线程上执行。如果当前执行上下文是需要高吞吐量的上下文,则这可能很危险。
比如使用一个directExecutor
,在UI线程中向它提交一个重量级的任务,就会自动阻塞我们的UI线程。
我们也可能面临这样一种情况:我们的听众 最终会减慢我们所有其他侦听器(即使是那些与directExecutor
)。这是因为 Guava 在各自的Executors,
while
循环中的所有侦听器,但是directExecutor
会导致侦听器运行在与while
循环相同的线程中。
5.3.嵌套期货不好
在使用鍊式期货时,我们应该注意不要以创建嵌套期货的方式从另一个期货内部调用一个期货:
public ListenableFuture<String> generatePassword(String username) {
return lExecService.submit(() -> {
TimeUnit.MILLISECONDS.sleep(500);
return username + "123";
});
}
String firstName = "john";
ListenableFuture<ListenableFuture<String>> badTask = lExecService.submit(() -> {
final String username = firstName.replaceAll("[^a-zA-Z]+", "")
.concat("@service.com");
return generatePassword(username);
});
如果我们曾经看到过包含ListenableFuture<ListenableFuture<V>>,
那么我们应该知道这是一个写得很糟糕的未来,因为外部未来的取消和完成有可能竞争,并且取消可能不会传播到内心的未来。
如果我们看到上述情况,我们应该始终使用Futures.async()
变体以连接的方式安全地解开这些鍊式期货。
5.4.小心JdkFutureAdapters.listenInPoolThread()
Guava 建议我们利用其ListenableFuture
的最佳方式是将所有使用Future
的代码转换为ListenableFuture.
如果在某些情况下这种转换不可行, Guava 为我们提供了适配器来使用JdkFutureAdapters.listenInPoolThread()
覆盖来做到这一点。虽然这看起来很有帮助,但Guava 警告我们这些是重量级的适配器,应该尽可能避免。
六,结论
在本文中,我们看到了如何使用 Guava 的ListenableFuture
来丰富我们对 futures 的使用,以及如何使用Futures
API 来更轻松地使用这些 futures。
我们还看到了在使用这些期货和提供的执行程序时可能会犯的一些常见错误。
0 评论