V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
zhangslob669
V2EX  ›  Java

关于 Java 线程池并发查询的问题

  •  
  •   zhangslob669 · 2021-04-16 17:01:02 +08:00 · 2550 次点击
    这是一个创建于 1310 天前的主题,其中的信息可能已经有所发展或是发生改变。

    example

    public void test() throws Exception {
            ExecutorService threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
            List<Future> futureList = new ArrayList<>();
            for (int i = 0; i < 10; i++) {
                Future future = threadPoolExecutor.submit(() -> {
                    System.out.println("do some work");
                    int time = new Random().nextInt(2000);
                    try {
                        Thread.sleep(time);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
                futureList.add(future);
            }
    
            for (Future future : futureList) {
                future.get(1000, TimeUnit.MILLISECONDS);
            }
        }
    

    我想在 1 秒内,批量查询,如果某次查询超时,就不要结果。最后获取所有成功的查询结果

    现在的写法是有问题的,每次从 futureList 获取结果都是阻塞的,最终结果肯定是大于 1 秒的,有没有好办法或者轮子?

    15 条回复    2021-04-17 18:28:55 +08:00
    Ariver
        1
    Ariver  
       2021-04-16 17:04:43 +08:00
    你需要 Reactor.
    DanielGuo
        2
    DanielGuo  
       2021-04-16 17:06:22 +08:00
    public void test() throws Exception {
    ExecutorService threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    List<Future> futureList = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
    Future future = threadPoolExecutor.submit(() -> {
    System.out.println("do some work");
    int time = new Random().nextInt(2000);
    try {
    Thread.sleep(time);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    });
    futureList.add(future);
    }
    Thread.sleep(1000);
    boolean allDone = futureList.stream().map(f -> f.isDone()).allMatch(result -> result == true);
    if (allDone) {
    for (Future future : futureList) {
    future.get();
    }
    }

    }
    DanielGuo
        3
    DanielGuo  
       2021-04-16 17:07:04 +08:00
    等待一秒,判断是否全部完成,然后获取结果。。。
    guxingke
        4
    guxingke  
       2021-04-16 17:08:08 +08:00
    CompletableFuture.allOf(f1,f2...fn).get(timeout)

    > 也许可以,没验证
    zhangslob669
        5
    zhangslob669  
    OP
       2021-04-16 17:09:53 +08:00
    @DanielGuo 这样就必须强制等待了,并不是一种优雅的做法;而且项目里不允许写 Thread.sleep(1000);等代码
    dqzcwxb
        6
    dqzcwxb  
       2021-04-16 17:19:01 +08:00
    @guxingke #4 Completablefuture 可行
    securityCoding
        7
    securityCoding  
       2021-04-16 17:19:50 +08:00
    Completablefuture 直接用这个
    SlipStupig
        8
    SlipStupig  
       2021-04-16 17:21:51 +08:00
    结果用异步回调
    xiaoxinshiwo
        9
    xiaoxinshiwo  
       2021-04-16 17:33:37 +08:00
    CountDownLatch 不香吗
    blisteringsands
        10
    blisteringsands  
       2021-04-16 19:42:10 +08:00
    submit()之后取一下当前时间,续 1 秒算出 deadline
    每次 future.get 之前重新取当前时间,和 deadline 减一下算出等待时间
    zzl22100048
        11
    zzl22100048  
       2021-04-16 19:56:01 +08:00 via iPhone
    你这要求完美符合 completablefuture
    zhady009
        12
    zhady009  
       2021-04-16 19:59:52 +08:00
    CompletableFuture 有个 completeOnTimeout 超时的时候可以设置默认值给个 null
    最后过滤掉为 null 的
    zhady009
        13
    zhady009  
       2021-04-16 20:09:49 +08:00
    ```java

    @Test
    public void demo() {
    QueryTask var0 = new QueryTask(900);
    QueryTask var1 = new QueryTask(2100);
    QueryTask var2 = new QueryTask(2000);
    QueryTask var3 = new QueryTask(2000);

    Demo<QueryTask, Integer> test = new Demo<>(1000, List.of(var0, var1, var2, var3));
    long l = System.currentTimeMillis();
    Collection<Integer> d = test.execute();
    System.out.println(System.currentTimeMillis() - l);
    assert d.size() > 0;
    for (Integer integer : d) {
    assert integer <= 1000;
    }
    }
    static class Demo<T extends Supplier<E>, E> {
    private static final ExecutorService ES = Executors.newFixedThreadPool(10);
    private final int timeout;
    private final Collection<T> tasks;
    Demo(int timeout, Collection<T> tasks) {
    this.timeout = timeout;
    this.tasks = tasks;
    }
    public List<E> execute() {
    List<CompletableFuture<E>> collect = tasks.stream().map(x -> CompletableFuture.supplyAsync(x, ES)
    .completeOnTimeout(null, timeout, TimeUnit.MILLISECONDS))
    .collect(Collectors.toUnmodifiableList());

    CompletableFuture<List<E>> listCompletableFuture = CompletableFuture.allOf(collect.toArray(new CompletableFuture[collect.size()]))
    .thenApply(v -> collect.stream().map(CompletableFuture::join)
    .filter(Objects::nonNull)
    .collect(Collectors.toList()));
    return listCompletableFuture.join();
    }
    }
    static class QueryTask implements Supplier<Integer> {
    private final int time;
    QueryTask(int time) {
    this.time = time;
    }
    @Override
    public Integer get() {
    try {
    //query
    Thread.sleep(time);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return time;
    }
    }

    ```
    zhgg0
        14
    zhgg0  
       2021-04-17 00:37:05 +08:00 via iPhone
    用线程池的 invokeAll 方法。
    或者 timeout 每次 get 前实时算。
    yazinnnn
        15
    yazinnnn  
       2021-04-17 18:28:55 +08:00
    val list = (0..9).map {
    async {
    withTimeoutOrNull(1000) {
    val long = Random.nextLong(2000)
    delay(long)
    it
    }
    }
    }
    println(list.map { it.await() })


    [0, 1, 2, 3, null, 5, 6, null, null, 9]

    kotlin 协程可以简单实现...

    或者 jdk11 用 CompletableFuture

    或者 jdk8 用一下 vertx 的 Promise api...


    fun main() {
    println("start ${Date()}")
    foo()
    println("end ${Date()}")
    }

    var threadPoolExecutor = ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, LinkedBlockingQueue())

    fun foo() {
    val future = (0..9).map { getSomething() }
    println(future.map { it.result() })
    Thread.sleep(1000)
    println(future.map { it.result() })
    }


    fun getSomething(): Future<String> {
    val promise = Promise.promise<String>()
    threadPoolExecutor.execute {
    Thread.sleep(Random.nextLong(1500))
    val result = Random.nextLong(3000).toString()
    promise.complete(result)
    }
    return promise.future()
    }


    start Sat Apr 17 17:56:12 CST 2021
    [null, null, null, null, null, null, null, null, null, null]
    [2255, null, 2370, 750, 1399, 2796, null, null, 39, null]
    end Sat Apr 17 17:56:13 CST 2021

    不过这样无法取消任务...
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2705 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 25ms · UTC 11:53 · PVG 19:53 · LAX 03:53 · JFK 06:53
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.