情况如下: 有 N 个任务,每个任务执行完都会返回结果或者 error,通过固定的(M)协程去执行,如果其中有一个任务返回 error 时立即结束,否则全部执行完成时返回结果列表。
我自己写了一版,感觉有点复杂:https://play.golang.org/p/ono1S04XupK
不知道各位有没有什么更简单的实现。
1
ToPoGE 2021-02-08 12:16:33 +08:00 via Android
context 不就可以实现吗
|
2
cloudfstrife 2021-02-08 12:26:53 +08:00 via Android
errgroup 了解一下
|
3
xylophone21 2021-02-08 12:30:21 +08:00
一起探讨,如果 M 不是大到离谱,是否需要控制 M ?还是无脑 go 就好了,因为这样感觉就是 go 程池 /线程池了,有点像把 go 程当线程用了?
|
4
sunorg 2021-02-08 12:44:23 +08:00
不是 channel 就可以实现了吗?
|
6
monkeyWie OP @xylophone21 现在就是 M 如果太大了服务器会顶不住,比如并发查询 sql
|
7
monkeyWie OP @cloudfstrife errgroup 我试了下,第一不能限制并发数,第二不能在发生错误时立即返回。
|
8
baodaren8 2021-02-08 13:35:46 +08:00
借楼问一下,我怎么不能发帖了。。
|
11
MadbookPro 2021-02-08 14:00:54 +08:00
一个 err chan 用来通知 root goroutine 发生错误,然后 root context 退出; wait group 用来做全部完成检查。
这样何如? |
12
fatedier 2021-02-08 14:04:21 +08:00
|
13
MadbookPro 2021-02-08 14:04:47 +08:00
哦还有并发控制。
taskChannel + workerChannel,这样 wait group 也不需要了 |
14
mogg 2021-02-08 14:23:54 +08:00
线程池+channel,收到错误消息停止向队列里发送消息。
想要立刻回收,主线程直接 stop 线程池里所有线程,否则等待运行中线程跑完 |
15
monkeyWie OP |
16
LoNeFong 2021-02-08 14:29:07 +08:00
errgroup + 1
|
17
Claar 2021-02-08 14:37:21 +08:00 via iPhone
并发控制:协程池
消息传递:channel 还有等待协程结束再结束 main 函数:sync. 甚至可以子任务报错直接 exit |
18
ToPoGE 2021-02-08 14:38:59 +08:00
@monkeyWie 那你用 errgroup 把,go 官方标准库中的,看文档直接用,就是 context 和 waitGroup 结合的
|
19
ppphp 2021-02-08 14:48:36 +08:00
看不到代码。。。我的思路是,for 循环几个 goroutine,select 的时候,先从 errchannel 里看 err,然后 default 里从 taskchannel 里拿 task
|
20
mogg 2021-02-08 15:48:18 +08:00
```go
var wg sync.WaitGroup pool := makePool(pollSize) func() { for i := 0; i < runTimes; i++ { select { case <-errChan: fmt.Println("error") pool.Stop() return default: wg.Add(1) poll.Submit(task) } } }() if pool.IsRunning() wg.Wait() ``` 我觉得核心就是这样一个结构,不过 go 没用过线程池,可以看看有什么库 |
21
KaynW 2021-02-08 15:53:11 +08:00
|
22
caiych 2021-02-08 16:05:58 +08:00
一个比较简单的实现
https://play.golang.org/p/MDU_x7C2npI 如果有一个 routine 有 error,ctx 会 Done 如果 ctx 已经 Done 了,semaphore 会 error |
23
wpf375516041 2021-02-08 17:03:40 +08:00
package main
import ( "fmt" "net/http" ) /*有 N 个任务,每个任务都会返回结果或者 error,通过固定的并发数(M)去执行。 如果其中有一个任务返回 error 时立即结束,否则全部执行完成时返回结果列表*/ func main() { n := 10 m := 5 result := make([]string, n) limitCh := make(chan interface{}, m) errCh := make(chan error) doneCh := make(chan interface{},1) defer func() { close(limitCh) close(errCh) }() for state, i := true, 0; i < n; i++ { state = true for state { select { case limitCh <- nil: fmt.Printf("开始第%d 个任务\n", i) go func(i int) { var err error defer func() { if i == n-1 { close(doneCh) } if err != nil { errCh <- err } <-limitCh }() ret, err := doTask() if err != nil { return } result[i] = ret }(i) state = false case <-errCh: return default: } } } <- doneCh fmt.Println(result) } func doTask() (string, error) { // 模拟执行任务 resp, err := http.Get("https://www.baidu.com") if err != nil { return "", err } defer resp.Body.Close() return resp.Status, nil } |
24
monkeyWie OP @caiych #22 这个做不到有一个 task 发生 error 立即结束,例如: https://play.golang.org/p/sqlMbgW7z9Z
|
26
monkeyWie OP @wpf375516041 #23 这个好像也有点问题哦,就是判断任务全部执行完成的地方
``` if i == n-1 { close(doneCh) } ``` 这里判断最后一个任务执行完成就结束,但是可能会存在还有正在执行的任务并且比最后一个任务执行还慢,就不对了。 |
27
caiych 2021-02-08 18:54:42 +08:00
@monkeyWie 你仔细看一下打出来的内容,第一批开始执行的任务里没有 1 ( 1 没有抢到信号量)。当 1 抢到信号量开始执行后整个程序都结束了(其他的任务都取消了,不然如果 i 那个循环大的话整个程序会运行很久)
|
28
rrfeng 2021-02-08 19:36:10 +08:00
1. 声明一个 buffered chan 当做 goroutine 池子 。
2. 启动 goroutine 的时候传入 context 用做取消(前提是你的 goroutine 任务可以取消)。 |
29
ginjedoad 2021-02-08 20:29:53 +08:00
兄弟,errgroup 就是为了你这个场景设计的。不要重复造轮子了
|
30
ginjedoad 2021-02-08 20:30:42 +08:00
说只要一个错误不能马上中断返回的,自己好好审计一下 errgroup
|
31
monkeyWie OP @caiych #27 错误发生时不会立刻结束,而是会等正在执行的任务全部完成才返回,你可以跑这个试试: https://play.golang.org/p/66Me2TYbVoK
错误发生了也要等 5 秒才结束。 |
33
wpf375516041 2021-02-08 22:55:03 +08:00
@monkeyWie 是滴 用 waitgroup 大家说的都对 其实你把原来的封装下 搞个协程池 代码就清晰了
|
34
wpf375516041 2021-02-08 23:16:58 +08:00 1
我觉得这是个很好的面试题,既有实际意义也考验基本功,大家可以试试不用三方库实现一下~
talk is cheap, show me the code 一起娱乐娱乐,新年快乐~! 1. 控制并发 2. 等待所有任务返回 3. 一个任务错误,立刻结束 如果不解耦,并发控制和结果处理的逻辑混杂确实屎 go 实现的时候想当然了,只以最后提交的任务判断是否结束 kotlin 协程实现的时候发现 io,计算任务无法退出,必须要手动捕捉中止信号 java 须要手动捕捉中止信号 但是可以通过 thread.stop()强制停止,另外判断线程是否异常退出较难 |
35
caiych 2021-02-09 00:06:49 +08:00 2
@monkeyWie
最开始的版本里注释有写 DoWork 里的操作需要支持 context cancellation (比如如果操作是 http.Get 的话,可以使用 http.NewRequestWithContext) 这个里面实现了一个如果调用的操作不支持 context cancellation 的情况 https://play.golang.org/p/Cot1FYgIKLd |
36
ooh 2021-02-09 00:11:53 +08:00
|
37
monkeyWie OP @wpf375516041 #34 哈哈,搞不好会加入大厂面试题库
|
38
monkeyWie OP @caiych #35 实际上很多 work 是不支持 cancel 的,而且也不一定要 cancel 掉,只要不阻塞主协程就行了,发送错误的时候主协程继续执行,其它正在执行的任务让它继续跑。
|
40
guonaihong 2021-02-09 09:51:10 +08:00
难道 slice+context.Context+errgroup 的组合不行?
1.分配 M 容量的 slice 。放 M 个 go 程正常运行的结果。每个 go 程都有自己的 index,所以存结果这块都不要加锁,相当舒服的操作。 2.context 当作异常终端点。每个 go 程都持有这个 context 变量。任意一个 go 程错误,cancel 。任意 go 程检查 ctx.Done()。所谓 “如果其中有一个任务返回 error 时立即结束” 完美实现。 3.检查 errgroup 的返回,err != nil,就返回错误,else 部分没有错误,返回第 1 步声明的 slice |
41
crclz 2021-02-09 09:54:50 +08:00
分解问题:
1. 通过固定的 M 个携程执行。 解决方案:信号量或者条件变量或者 channel,很多种方法实现。 2. 只要出现一个任务返回 Error,就立即结束全部任务。 解决方案:C#的 TAP 的 Task.WhenAny 和 CancellationToken 模式可以很好的解决这些问题。 那么只需要用 go 来实现 Task.WhenAny 和 CancellationToken 模式即可。 CancellationToken 很好实现,用 struct {IsCancellationRequired: bool}就可以实现。 Task.WhenAny 可以用如下方式实现:有一个 chan ch,每个任务完成后,往 ch 里面写一个数字(或者写入任务信息),同时主线程阻塞读取 ch 。 |
42
monkeyWie OP @guonaihong #40 应该行的,但是对第一点有点疑问,用 slice 怎么实现 M 个协程的限制呢
|
43
guonaihong 2021-02-09 10:01:11 +08:00
//放结果伪代码
// 每个 go 程的 id 已经固定下来,就是 for 循环启动的 index.大家操作自己的私有 index 。为啥会有竞争? for i :=0;i < M;i++ { i:=i go func(){ slice[i] = result } } |
44
jackeliang 2021-02-09 10:09:47 +08:00
@monkeyWie 发生错误不能立即返回的,需要等待其它协程结束才行,不然协程泄露了。
|
45
aeli 2021-02-09 10:11:36 +08:00
所有在并发里要求错误立即中断所有并发返回的,显然是脑抽抽
|
46
dawniii 2021-02-09 10:13:53 +08:00
当其中有一个协程有错误,另一个协程在密集计算,应该是没法打断直接返回的吧
|
47
SignLeung 2021-02-09 11:10:36 +08:00 1
楼上说的没错,协程好像只能自己结束
|
48
seth19960929 2021-02-09 11:11:06 +08:00
N 个的容量的 success channel.
再来一个 err channel 然后主线程那里 select 这两个 channel 做事情就可以啦. ---------------------- 至于你纠结 err 之后协程能不能关闭, 那个不是你关心的事情了. 可以考虑传递一个 context 给 request, err 发生错误的时候进行 cancel context 即可. |
49
liyunlong41 2021-02-09 11:24:29 +08:00
https://play.golang.org/p/YYvynelzIHj
感兴趣写了下,如果想出现错误后中断其他 goroutine 的处理,其他 goroutine 必须可以被 cancel 掉。 |
50
asAnotherJack 2021-02-09 12:47:03 +08:00
errgroup 可以实现出错时结束流程,前提是你的代码实现了 cancel 逻辑
|
51
Aoang 2021-02-09 14:47:03 +08:00 via Android
并发限制用协程池,退出机制需要自己实现。
如果懒得实现,就直接摘出来,用系统的实现。main 启动之后开一个协程监听退出信号,收到退出信号之后直接 os.Exit() 其实还是应该自己在代码中实现出来,那个操作只能玩玩。看看源码,用 runtime.Goexit() 来终止协程。 errgroup context 都是用来传递消息的,并不是来做终止的。你用这两个来做的话,只能每进行一步就检查一下是否有退出信号,不然就做不到及时退出。 |
52
teawithlife 2021-02-09 14:48:32 +08:00
@liyunlong41 #49 你好,请教一下,在 61 行的这一部分,为什么两个 channel 之间互相要写数据?
``` select { case err := <-errCh: handleErr(err, result[1:]) <-done case <-done: if len(errCh) > 0 { err := <-errCh handleErr(err, result[1:]) return } fmt.Println("success handle all task:", result[1:]) } ``` 改成这样是否可以? ``` select { case err := <-errCh: handleErr(err, result[1:]) case <-done: fmt.Println("success handle all task:", result[1:]) } ``` |
53
Aoang 2021-02-09 15:01:42 +08:00 via Android
|
54
liyunlong41 2021-02-09 15:28:38 +08:00
@teawithlife 没有相互写数据,是为了一些健壮性考虑吧。第一个 case 如果从 errCh 中读到 err 了,<-done 表示等待其他 goroutine 也都结束,是怕有 goroutine 泄露。第二个 case 是考虑可能有极小概率 errCh 和 done 同时有数据,select 随机选择了 done channel,所以最终再判断下 errCh 是否有 err 。
|
55
monkeyWie OP @liyunlong41 #49 目前 35L 这种应该是最优雅的实现,我们用纯标准库实现的还是太复杂了哈哈
|
56
teawithlife 2021-02-09 20:55:51 +08:00
@liyunlong41 #54 “相互写数据”这个表述不太严谨。不过你说的有道理,从健壮性来说,确实需要考虑这些极端情况。
@monkeyWie #55 个人觉得 35L 的写法确实比较优雅,但是从效率来说,还是 49L 的写法更好一些,因为 49L 的协程数量是 M 个,而 35L 的协程数量是 N 个,当 N>>M 时,虽然 golang 的协程足够轻量,但是也没必要这么浪费。 |
57
monkeyWie OP @teawithlife #56 35L 协程数量其实是 N 个,用信号量做了控制的
|
59
teawithlife 2021-02-10 08:29:04 +08:00
@monkeyWie #57 协程有 N 个,只不过通过信号量保证了其中只有 M 个在跑,其他的协程虽然在等待,也需要消耗少量资源
|
60
Nillouise 2021-02-10 09:52:31 +08:00
我之前研究过 go 的流控怎么写,感觉还是流控好,流控还可以控制几秒才做一个任务,单纯控制并发感觉不如流控。
|
61
monkeyWie OP |
62
MilletChili 2021-02-10 11:23:42 +08:00
感觉不需要写太多代码,直接用 channel 控制就好了
https://play.golang.org/p/JiD2EopqPkL |
63
monkeyWie OP @MilletChili #62 这种思路好像也不错,不过如果要加上参数传递和结果、错误返回也还是挺复杂的
|
64
monkeyWie OP @MilletChili #62 可以尝试下用这种思路实现下这里的 run 方法,https://play.golang.org/p/Be7vNF4JH4-
|
65
MilletChili 2021-02-10 13:57:42 +08:00
@monkeyWie 嗯嗯,真要项目搞,还是用一些开源包好点
|
66
troywinter 2021-02-10 16:42:15 +08:00
控制并发直接用 Semaphore 就行了,至于遇到错误怎么退出 goroutine 应该是你的业务代码自己实现,不同业务场景需要有不同的处理。
|
67
YouLMAO 2021-02-11 22:50:02 +08:00 via Android
errgroup.withcontext 楼主是不是没学过 go 呀? 这样只要一个 err,context 就会取消,全部都返回了
|
68
tolerance 2021-02-13 00:37:32 +08:00
把 channel 关了就可以
|
69
kevinwan 2021-02-17 13:09:27 +08:00 via iPhone
go-zero 下有个 mr 包解决这种场景,遇到 error 可以 cancel 所有任务
|
70
abccccabc 2021-02-19 10:21:51 +08:00
各位问一下,你们是怎样看 play.golang.org/p/xxxxxx 代码的?难道要 pa 强??
|