博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
CompletionService简讲
阅读量:5039 次
发布时间:2019-06-12

本文共 2557 字,大约阅读时间需要 8 分钟。

背景

最近在项目中看到太多后台task中使用Executor框架,提交任务后,把future都一个个加入到list,再一个个get这些future的代码。

这个的问题在于一方面没有时限,可能会被某些运行缓慢的future拖很久。即便使用带超时控制的get方法,这样加入list再get的做法依然很繁琐。

其实在《Java并发编程实战》或者《Java多线程编程的艺术》这些书中都介绍过JDK提供了CompletionService接口。

例程

JDK为我们提供的CompletionService接口的默认实现是java.util.concurrent.ExecutorCompletionService。在它的Java Doc中已经给出两个demo例程。

void solve(Executor e, Collection
> solvers) throws InterruptedException, ExecutionException { CompletionService
ecs = new ExecutorCompletionService
(e); for (Callable
s : solvers) ecs.submit(s); int n = solvers.size(); for (int i = 0; i < n; ++i) { Result r = ecs.take().get(); if (r != null) use(r); } }

上面的例程展示了CompletionService的基本使用。它的实现融合了Executor和BlockingQueue。可以看到任务的执行依托于内部的Executor,而一个任务完成后会被加到阻塞队列中,调用线程可以及时获取到新完成的任务。

如下所示为Java Doc中另一个例程。

void solve(Executor e, Collection
> solvers) throws InterruptedException { CompletionService
ecs = new ExecutorCompletionService
(e); int n = solvers.size(); List
> futures = new ArrayList
>(n); Result result = null; try { for (Callable
s : solvers) futures.add(ecs.submit(s)); for (int i = 0; i < n; ++i) { try { Result r = ecs.take().get(); if (r != null) { result = r; break; } } catch (ExecutionException ignore) {} } } finally { for (Future
f : futures) f.cancel(true); } if (result != null) use(result); }

上面的例程中,用于获取第一个返回值不为null的任务结果,并取消其他任务。

原理

ExecutorCompletionService的源码实现非常简单。

内部就三个东西:

// 构造方法传入的executor实例。private final Executor executor;// 如果构造方法传入的executor实例是AbstractExecutorService子类,则类型转化后保存。private final AbstractExecutorService aes;// 用于保存完成的future,所谓完成可以是有异常或者已经取消。private final BlockingQueue
> completionQueue;

内部最核心的嵌套类是:

private class QueueingFuture extends FutureTask
{ QueueingFuture(RunnableFuture
task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future
task;}

QueueingFuture内部组合了一个RunnableFuture,然后在构造方法中通过父类FutureTask的构造方法,将之转化为FutureTask的内部callable。

它对FutureTask中的钩子方法done进行了覆盖,将构造函数传入的RunnableFuture在完成后加到阻塞队列中。

FutureTask的done方法会在任务正常完成/发生异常/被取消后被调用。更多源码可以参考我的。

剩余提交任务的各种submit方法,无非就是在原来的FutureTask上用QueueingFuture套上一套,实现任务在完成后加到阻塞队列的逻辑。

而获取任务的take/poll方法的实现就是调用内部阻塞队列而已。

至此,全部讲完了。

转载于:https://www.cnblogs.com/micrari/p/7495925.html

你可能感兴趣的文章
phpcms 添加自定义表单 留言
查看>>
mysql 优化
查看>>
读书笔记 ~ Nmap渗透测试指南
查看>>
WCF 配置文件
查看>>
动态调用WCF服务
查看>>
oracle导出/导入 expdp/impdp
查看>>
类指针
查看>>
css修改滚动条样式
查看>>
2018.11.15 Nginx服务器的使用
查看>>
Kinect人机交互开发实践
查看>>
百度编辑器UEditor ASP.NET示例Demo 分类: ASP.NET...
查看>>
JAVA 技术类分享(二)
查看>>
android客户端向服务器发送请求中文乱码的问
查看>>
UOJ#220. 【NOI2016】网格 Tarjan
查看>>
Symfony翻译教程已开课
查看>>
Python模块之pickle(列表,字典等复杂数据类型与二进制文件的转化)
查看>>
通过数据库表反向生成pojo类
查看>>
css_去掉默认样式
查看>>
TensorFlow2.0矩阵与向量的加减乘
查看>>
NOIP 2010题解
查看>>