example
public void test() throws Exception {
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
List<Future> futureList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Future future = threadPoolExecutor.submit(() -> {
System.out.println("do some work");
int time = new Random().nextInt(2000);
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
futureList.add(future);
}
for (Future future : futureList) {
future.get(1000, TimeUnit.MILLISECONDS);
}
}
我想在 1 秒内,批量查询,如果某次查询超时,就不要结果。最后获取所有成功的查询结果
现在的写法是有问题的,每次从 futureList 获取结果都是阻塞的,最终结果肯定是大于 1 秒的,有没有好办法或者轮子?
1
Ariver 2021-04-16 17:04:43 +08:00
你需要 Reactor.
|
2
DanielGuo 2021-04-16 17:06:22 +08:00
public void test() throws Exception {
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); List<Future> futureList = new ArrayList<>(); for (int i = 0; i < 10; i++) { Future future = threadPoolExecutor.submit(() -> { System.out.println("do some work"); int time = new Random().nextInt(2000); try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } }); futureList.add(future); } Thread.sleep(1000); boolean allDone = futureList.stream().map(f -> f.isDone()).allMatch(result -> result == true); if (allDone) { for (Future future : futureList) { future.get(); } } } |
3
DanielGuo 2021-04-16 17:07:04 +08:00
等待一秒,判断是否全部完成,然后获取结果。。。
|
4
guxingke 2021-04-16 17:08:08 +08:00
CompletableFuture.allOf(f1,f2...fn).get(timeout)
> 也许可以,没验证 |
5
zhangslob669 OP @DanielGuo 这样就必须强制等待了,并不是一种优雅的做法;而且项目里不允许写 Thread.sleep(1000);等代码
|
7
securityCoding 2021-04-16 17:19:50 +08:00
Completablefuture 直接用这个
|
8
SlipStupig 2021-04-16 17:21:51 +08:00
结果用异步回调
|
9
xiaoxinshiwo 2021-04-16 17:33:37 +08:00
CountDownLatch 不香吗
|
10
blisteringsands 2021-04-16 19:42:10 +08:00
submit()之后取一下当前时间,续 1 秒算出 deadline
每次 future.get 之前重新取当前时间,和 deadline 减一下算出等待时间 |
11
zzl22100048 2021-04-16 19:56:01 +08:00 via iPhone
你这要求完美符合 completablefuture
|
12
zhady009 2021-04-16 19:59:52 +08:00
CompletableFuture 有个 completeOnTimeout 超时的时候可以设置默认值给个 null
最后过滤掉为 null 的 |
13
zhady009 2021-04-16 20:09:49 +08:00
```java
@Test public void demo() { QueryTask var0 = new QueryTask(900); QueryTask var1 = new QueryTask(2100); QueryTask var2 = new QueryTask(2000); QueryTask var3 = new QueryTask(2000); Demo<QueryTask, Integer> test = new Demo<>(1000, List.of(var0, var1, var2, var3)); long l = System.currentTimeMillis(); Collection<Integer> d = test.execute(); System.out.println(System.currentTimeMillis() - l); assert d.size() > 0; for (Integer integer : d) { assert integer <= 1000; } } static class Demo<T extends Supplier<E>, E> { private static final ExecutorService ES = Executors.newFixedThreadPool(10); private final int timeout; private final Collection<T> tasks; Demo(int timeout, Collection<T> tasks) { this.timeout = timeout; this.tasks = tasks; } public List<E> execute() { List<CompletableFuture<E>> collect = tasks.stream().map(x -> CompletableFuture.supplyAsync(x, ES) .completeOnTimeout(null, timeout, TimeUnit.MILLISECONDS)) .collect(Collectors.toUnmodifiableList()); CompletableFuture<List<E>> listCompletableFuture = CompletableFuture.allOf(collect.toArray(new CompletableFuture[collect.size()])) .thenApply(v -> collect.stream().map(CompletableFuture::join) .filter(Objects::nonNull) .collect(Collectors.toList())); return listCompletableFuture.join(); } } static class QueryTask implements Supplier<Integer> { private final int time; QueryTask(int time) { this.time = time; } @Override public Integer get() { try { //query Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } return time; } } ``` |
14
zhgg0 2021-04-17 00:37:05 +08:00 via iPhone
用线程池的 invokeAll 方法。
或者 timeout 每次 get 前实时算。 |
15
yazinnnn 2021-04-17 18:28:55 +08:00
val list = (0..9).map {
async { withTimeoutOrNull(1000) { val long = Random.nextLong(2000) delay(long) it } } } println(list.map { it.await() }) [0, 1, 2, 3, null, 5, 6, null, null, 9] kotlin 协程可以简单实现... 或者 jdk11 用 CompletableFuture 或者 jdk8 用一下 vertx 的 Promise api... fun main() { println("start ${Date()}") foo() println("end ${Date()}") } var threadPoolExecutor = ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, LinkedBlockingQueue()) fun foo() { val future = (0..9).map { getSomething() } println(future.map { it.result() }) Thread.sleep(1000) println(future.map { it.result() }) } fun getSomething(): Future<String> { val promise = Promise.promise<String>() threadPoolExecutor.execute { Thread.sleep(Random.nextLong(1500)) val result = Random.nextLong(3000).toString() promise.complete(result) } return promise.future() } start Sat Apr 17 17:56:12 CST 2021 [null, null, null, null, null, null, null, null, null, null] [2255, null, 2370, 750, 1399, 2796, null, null, 39, null] end Sat Apr 17 17:56:13 CST 2021 不过这样无法取消任务... |