V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
wenbingkun
V2EX  ›  Java

Java 同时调用 5000 个第三方接口并异步返回每个接口的执行结果,如何实现?

  •  
  •   wenbingkun · 2022-09-27 15:39:40 +08:00 · 7571 次点击
    这是一个创建于 795 天前的主题,其中的信息可能已经有所发展或是发生改变。
    同时调用 5000 个第三方接口,异步获取每个接口执行结果,有什么好的 Java 实现思路吗?
    每个接口的执行时间大概在 5 分钟以上
    第 1 条附言  ·  2022-09-27 16:46:18 +08:00
    实际需求是对多台服务器进行固件更新的一个功能,最多可能有 5000 台,Java 调用 bmc 的固件更新接口,这个接口固件更新完确实要 5 分钟左右,每一台服务器 ip 不同所以说相对于是 5000 个接口
    81 条回复    2022-12-25 11:22:35 +08:00
    hex2en
        1
    hex2en  
       2022-09-27 15:40:53 +08:00
    直接用异步会有问题吗?
    wenbingkun
        2
    wenbingkun  
    OP
       2022-09-27 15:49:37 +08:00
    @hex2en 还没开始写,所以不知道会不会有什么问题,想问下大家有没有好的思路
    Red998
        3
    Red998  
       2022-09-27 15:50:56 +08:00   ❤️ 5
    外星接口 外星需求
    huanglongtiankon
        4
    huanglongtiankon  
       2022-09-27 15:56:34 +08:00
    不是,整整跑 5 分钟到接口是什么鬼,你这个接口本身不做异步的吗,真正要做异步的是你这个要跑整整 5 分钟到接口吧
    hex2en
        5
    hex2en  
       2022-09-27 15:56:48 +08:00
    @wenbingkun 自定义一个线程池,直接搞异步先写再说。不过直接请求 5000 个接口这种情况,还是先考虑下设计有没有问题吧
    muchenlou
        6
    muchenlou  
       2022-09-27 15:58:27 +08:00   ❤️ 1
    不能使用回调的方式,去接受执行结果吗?
    Jooooooooo
        7
    Jooooooooo  
       2022-09-27 16:01:17 +08:00
    先考虑一下需求和实现的合理性?

    5000 个调用可能内存要爆了.
    zmal
        8
    zmal  
       2022-09-27 16:02:42 +08:00
    AIO 、kotlin 协程,akka
    Vegetable
        9
    Vegetable  
       2022-09-27 16:07:17 +08:00
    什么逆天需求。

    本身如果不考虑异常情况,这需求 2 个和 5000 个没什么区别,直接用多线程未尝不可,不过线程太多了很不划算,找个协程方案就行了,kotlin 挺好的。
    xuelu520
        10
    xuelu520  
       2022-09-27 16:09:50 +08:00
    我也好奇这是什么需求?
    ourslay
        11
    ourslay  
       2022-09-27 16:12:13 +08:00 via iPhone
    做好重试,可以试试 loom 虚拟线程很丝滑
    tramm
        12
    tramm  
       2022-09-27 16:13:12 +08:00
    Forest 试试呢
    wenbingkun
        13
    wenbingkun  
    OP
       2022-09-27 16:15:19 +08:00
    @hex2en 目前是准备先自定义线程池然后异步

    @huanglongtiankon 确实要跑这么久😂

    @muchenlou Callable 应该也可以
    oneisall8955
        14
    oneisall8955  
       2022-09-27 16:20:22 +08:00 via Android
    把这需求砍了吧
    edotac
        15
    edotac  
       2022-09-27 16:22:54 +08:00
    一般是第三方接口做回调...
    mingsz
        16
    mingsz  
       2022-09-27 16:25:29 +08:00
    控制好并发吧,一个请求 5 分钟到时候别第三方崩溃了
    unco020511
        17
    unco020511  
       2022-09-27 16:26:40 +08:00
    第三方没有提供回调形式吗
    li24361
        18
    li24361  
       2022-09-27 16:26:54 +08:00
    考虑用 mq 呗
    v2eb
        19
    v2eb  
       2022-09-27 16:29:34 +08:00 via Android
    详细说下需求吧, 真想看看, 🐒
    angryfish
        20
    angryfish  
       2022-09-27 16:38:26 +08:00
    详细说下需求,很感兴趣,这个五分钟的接口是干嘛的
    allenzhangSB
        21
    allenzhangSB  
       2022-09-27 16:40:28 +08:00
    你这是拿面试题来问的吧
    liuymf
        22
    liuymf  
       2022-09-27 16:41:22 +08:00
    @angryfish 瞎猜是爬虫....
    wenbingkun
        23
    wenbingkun  
    OP
       2022-09-27 16:46:53 +08:00
    @v2eb
    @angryfish
    @allenzhangSB
    @liuymf 详细需求已附言
    superchijinpeng
        24
    superchijinpeng  
       2022-09-27 16:48:16 +08:00
    建议滚动更新
    abc0123xyz
        25
    abc0123xyz  
       2022-09-27 16:49:27 +08:00
    moshiyeap100
        26
    moshiyeap100  
       2022-09-27 16:49:41 +08:00   ❤️ 1
    CompletableFuture 异步线程池,最后汇总下执行结果。
    liuymf
        27
    liuymf  
       2022-09-27 17:04:00 +08:00
    @wenbingkun

    1.把个更新服务单独出去部署; (避免批量调用的时候影响其它服务)
    2.时效性要求不是很严格的话 慢慢调试找出一个合理的调用数量; 比如一次 100 个 慢慢循环
    3.时效性要求搞的话; 集群: 10 台 每台 500 个
    dqzcwxb
        28
    dqzcwxb  
       2022-09-27 17:07:52 +08:00   ❤️ 1
    CompletableFuture+自定义 ForkJoinPool
    Kasumi20
        29
    Kasumi20  
       2022-09-27 17:14:56 +08:00
    lock set status(id, statu)
    lock get status(id, statu)

    for 1..5000 {
    thread {
    set status(id, false)
    await call api
    set status(id, true)
    }
    }
    registerrr
        30
    registerrr  
       2022-09-27 17:17:39 +08:00
    搞个队列吧,5000 个一起上,虽然可能也能行,但总是觉得有点莽
    allenzhangSB
        31
    allenzhangSB  
       2022-09-27 17:24:03 +08:00
    这附言中的需求, 感觉更适合客户端定时扫接口判断是否需要更新固件, 然后更新后上报, 也可以用消息广播,
    即使是服务端调用过去, 也不需要阻塞 5 分钟, 设计成通知机制, 通知需要更新固件, 然后更新结果上报(或者服务端定时调接口获取更新结果)
    28Sv0ngQfIE7Yloe
        32
    28Sv0ngQfIE7Yloe  
       2022-09-27 17:37:20 +08:00
    为什么不是

    监听 + 广播 + 结果上报?
    wanacry
        33
    wanacry  
       2022-09-27 17:41:30 +08:00
    内存会不会爆
    jorneyr
        34
    jorneyr  
       2022-09-27 18:14:22 +08:00
    Go 的协程做这个比 Java 的线程更合适,或者升级到 Java 19 也支持虚拟线程了,性能提高很多。
    jorneyr
        35
    jorneyr  
       2022-09-27 18:15:52 +08:00
    @registerrr 搞个队列吧,5000 个一起上,虽然可能也能行,但总是觉得有点莽。
    感觉队列好像也不太好,楼主说每个接口的执行时间大概是 5 分钟,队列只能保证并发量,这样会导致总的运行时间非常长。
    treblex
        36
    treblex  
       2022-09-27 18:36:19 +08:00
    a 通知 b 开始更新
    5 分钟后
    b 通知 a 更新结果

    类似订单系统里那个回调流程
    darkengine
        37
    darkengine  
       2022-09-27 19:12:29 +08:00
    我建议换个思路。下发固件下载地址和固件的 md5 让设备自己下载,校验 MD5 之后自行更新,这样还能用上 CDN 。
    sutra
        38
    sutra  
       2022-09-27 19:17:34 +08:00
    “同时”是怎么一个定义。
    能否查询更新状态。

    使用生产者消费者模式,把这些 tasks 摊平,让消费者去执行更新和查询状态。
    CnpPt
        39
    CnpPt  
       2022-09-27 19:24:25 +08:00 via Android
    看出来回复里大多数人都没接触过 BMC ,我管理过 500+物理机,这是切实的需求,但是频次很低,我选择拆分主机,同时更新少量主机,战线拉长
    MrKrabs
        40
    MrKrabs  
       2022-09-27 19:27:23 +08:00
    开 5000 个线程,有问题再说
    kubylo
        41
    kubylo  
       2022-09-27 19:47:15 +08:00
    就直接 io 多路复用搞呗,这还需要考虑啥吗
    msaionyc
        42
    msaionyc  
       2022-09-27 19:56:49 +08:00 via iPhone
    分批做也可以,这种事情也没有要求你必须几分钟内全部做完吧

    另外我觉得设备上的更新接口也可以改下,触发之后立即返回,然后就开始更新,等个若干分钟之后再调用一次,如果已经是新版本了就返回已更新就可以了,如果之前失败了或者还没成功就果断时间重试。
    zifangsky
        43
    zifangsky  
       2022-09-27 20:54:52 +08:00
    典型的分布式处理场景,按机房维度分别部署多个实例处理就行了
    janus77
        44
    janus77  
       2022-09-27 20:59:21 +08:00
    没必要 5000 个同时开始吧?用线程池,哪个执行完了就空出线程再执行一个新的啊
    ration
        45
    ration  
       2022-09-27 21:15:47 +08:00 via Android
    调接口通知服务器更新,通知成功直接返回。服务器上应该监听更新完成的程序。更新完成后回调给另一个记录更新成功的接口。
    iseki
        46
    iseki  
       2022-09-27 21:30:09 +08:00 via Android
    异步了也分一下批,免得把机器带宽打满了,还是要传好久才能传完,先把一部分传好从不所有机器都挤牙膏强
    yeqizhang
        47
    yeqizhang  
       2022-09-27 21:30:54 +08:00 via Android
    我觉得 27 楼说的对。
    我觉得 future task 没啥用吧,task 没执行完,线程也是在排在线程池队列中,直接设置线程池超大的话,内存设置大点就行?
    ychost
        48
    ychost  
       2022-09-27 22:04:40 +08:00
    用 NIO 就好了,比如 WebFlux
    lchqfnu
        49
    lchqfnu  
       2022-09-27 22:06:58 +08:00 via iPhone
    DeferredResult
    是不是跟我这个问题类似
    https://www.v2ex.com/t/878147
    fox0001
        50
    fox0001  
       2022-09-27 22:23:36 +08:00 via Android
    如果每台服务器有开始执行的接口,也有接口可以获取执行的状态,包括正在执行、执行成功、执行失败。那就简单了。for 循环一遍所有开始执行的接口,再每隔 1 分钟或 30 秒遍历所有获取结果的接口,直到所有服务器都获取了执行完毕的结果。

    如果每台服务器的接口需要等待完成,才能获取结果,就只能是楼上的那些方案了。弄个线程池去逐批执行,或者
    @dqzcwxb #28 的方案,或者 Go 协程那些
    fanxasy
        51
    fanxasy  
       2022-09-27 22:56:30 +08:00
    vertx.io 解君愁
    bxb100
        52
    bxb100  
       2022-09-27 23:36:31 +08:00
    没有包袱的建议使用 19 的 virtual thread, 否则就是异步那一套
    jones2000
        53
    jones2000  
       2022-09-27 23:40:28 +08:00
    任意一台服务器把下好固件包存盘, 同一个网段的服务器, 先在自己内网找是否有匹配的固件包,有就在这台服务器上下载,没有再去外网调用第 3 放接口下。下完也存盘,供其他服务器下载。
    zzxgz
        54
    zzxgz  
       2022-09-28 00:30:01 +08:00
    用 ansible 会不会方便点?
    lingly02
        55
    lingly02  
       2022-09-28 00:53:50 +08:00 via iPhone
    用 ansible 把固件和更新程序发到 50 台机器上,每台机器负责更新 100 台。这样程序用简单的多线程就搞定了。只是要计算一下瞬时带宽,不要影响正常业务通信
    yhvictor
        56
    yhvictor  
       2022-09-28 02:10:30 +08:00 via iPhone
    5000 个线程并不会有任何问题,带来的内存开销并不大。
    如果 5000 个线程解决不了,nio ,fiber 一样解决不了。
    zeni123
        57
    zeni123  
       2022-09-28 06:13:05 +08:00 via iPhone
    可以 5000 个线程 因为同时被阻塞了 调度压力应该不打
    jorneyr
        58
    jorneyr  
       2022-09-28 08:23:02 +08:00   ❤️ 1
    @zzxgz Ansible 每个任务都会起一个进程,如果是耗时任务增加 -B -P 实时心跳检测的话每个任务还会多出 2 个进程,5000 个任务这会导致进程风暴吧。
    dzdh
        59
    dzdh  
       2022-09-28 09:06:30 +08:00
    难道不是生成批次 ,然后消费批次,然后最终页面通知吗
    byte10
        60
    byte10  
       2022-09-28 09:19:52 +08:00
    @kubylo 你看看评论区就知道要考虑啥,因为没人知道 NIO 呗,只知道多线程。
    @Jooooooooo 5000 个不算啥,5w 都不是问题,多路复用就可以了,NIO 。
    @Vegetable NIO 就可以了,不一定要协程。
    yogogo
        61
    yogogo  
       2022-09-28 09:24:05 +08:00
    固件升级不是应该目标机器自己定时来检查是否有新固件版本吗?然后有新版本自己升级
    neptuno
        62
    neptuno  
       2022-09-28 09:28:24 +08:00
    消息队列放 5000 个任务,多台机器慢慢消费吧
    nothingistrue
        63
    nothingistrue  
       2022-09-28 09:42:21 +08:00
    异步要全程异步才有用,“每个接口的执行时间大概在 5 分钟以上”这个同步过程,注定了你在其他地方怎么异步都没作用。所以,在上面这个同步过程无法更改的情况下,全程同步反而会更节省资源。就简单的单线程或多线程依次同步执行即可。找个好点的机器,5000 个线程同时执行也不是问题。事实上这 5000 个线程因为绝大部分时间都是在等接口返回结果,消耗不了多少 CPU ,只是需要一些内存。
    justRua
        64
    justRua  
       2022-09-28 09:49:27 +08:00
    用个 NIO 实现的异步 http client 的做就行了吧,就像 webflux ,IO 异步调用不需要太多的线程
    winglight2016
        65
    winglight2016  
       2022-09-28 09:57:06 +08:00
    别用 java 干这事儿了,还是 go 一把梭了。java 的 nio 和 reactive 国内案例太少了,不知道实际效果怎么样,flink 、spark 、airflow 这些倒是有用 akka 做任务调度,也许可以试试。
    angryfish
        66
    angryfish  
       2022-09-28 10:24:46 +08:00
    1.关注下机房总带宽是否够,如果不够,你还是不能一次搞 5 千个并发
    2.搞 manager+agnet 机器咯,manger 负责下发指令给 agent ,agent 负责 http 更新操作,理论上只要你 agent 够多,带宽够大,5 万个并发都不是问题。
    最后,别想这么多,直接干,出问题再想办法解决。
    INCerry
        67
    INCerry  
       2022-09-28 10:31:13 +08:00   ❤️ 1
    如果是 C#的话,这个需求可太简单了。
    const int length = 5000;
    var tasks = new Task<HttpResponseMessage>[length];
    for (int i = 0; i < length; i++)
    {
    tasks[i] = new HttpClient().GetAsync("https://www.baidu.com");
    }

    // 等待全部执行完
    var responseArrays = await Task.WhenAll(tasks);
    // 做余下处理就行了
    526326991
        68
    526326991  
       2022-09-28 10:33:55 +08:00
    可以尝试工作窃取算法处理
    NoKey
        69
    NoKey  
       2022-09-28 11:06:09 +08:00
    37 楼那个是通常的做法;然后,如果客户端做不到这点,你这里也无法做到 5000 个并发,就一批一批的做呗
    alsas
        70
    alsas  
       2022-09-28 11:46:01 +08:00
    这种活直接用 goroutine
    litguy
        71
    litguy  
       2022-09-28 12:58:32 +08:00
    让 5000 设备主动 pull 升级包就行了
    你折腾啥并发
    自己 pulll ,自己升级自己
    启动时候给你 report 一下自己版本就行了
    blankmiss
        72
    blankmiss  
       2022-09-28 14:22:20 +08:00
    @litguy 确实 我感觉这种更好
    dog82
        73
    dog82  
       2022-09-28 14:45:34 +08:00
    用 Go 协程写个小工具就行
    b2byco
        74
    b2byco  
       2022-09-28 15:58:22 +08:00
    Janino
    http://janino-compiler.github.io/janino/

    The ShippingCost class demonstrates how easy it is to use Janino as an expression evaluator.
    The ExpressionDemo class implements a command line-based test environment for the expression evaluator.
    The ScriptDemo class implements a command line-based test environment for the script evaluator.
    The ClassBodyDemo class implements a command line-based test environment for the class body evaluator.
    JohnBull
        75
    JohnBull  
       2022-09-28 20:39:48 +08:00
    shell 脚本就够用
    zzxgz
        76
    zzxgz  
       2022-09-29 00:09:30 +08:00
    @jorneyr 原来是这样的,你说得有道理,受教了。
    xuanbg
        77
    xuanbg  
       2022-09-29 08:01:45 +08:00
    @winglight2016 Netty:???我这是在哪里?
    mynameislihua
        78
    mynameislihua  
       2022-09-29 10:35:38 +08:00
    消息队列
    touchmii
        79
    touchmii  
       2022-11-02 23:38:55 +08:00
    就是自动升级主板 bios 吧,五分钟的升级不代表上传数据得花这么久时间呀,几秒钟传文件就完事了,然后轮询去检查固件更新是否完成,可以考虑分批次上传固件,检查更新状态单独处理,如果 java 不好处理可以使用 python 的协程网络库来单独处理固件上传的功能,其实这跟 python 爬虫很相似,都是并发的 tcp 客户端请求,netty 也能很好实现就能难度比较大。
    wenbingkun
        80
    wenbingkun  
    OP
       2022-11-03 09:28:15 +08:00
    @touchmii 上传固件包确实很快啊,但升级确实需要 3-5 分钟,现在是用 Java 的 CompletableFuture 并行处理的,已经基本实现功能了
    linvaux
        81
    linvaux  
       2022-12-25 11:22:35 +08:00
    这种不应该是做上报么,服务器接收到升级通知后,异步执行,升级之后正常上报状态,
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1051 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 35ms · UTC 19:27 · PVG 03:27 · LAX 11:27 · JFK 14:27
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.