背景:
这是一个批量接口,目的是接口接收数据,数据交给协程存入 Kafka ,接口立即响应成功,越快越好,请求频率每秒 70 次,一次请求数组携带 1000 条数据。
压测的时候发现这个接口使用协程会导致服务内存暴涨,昨天查了一下午也没有头绪,即使什么也不做,只打印个数组元素的长度,发现如果不使用协程直接 for 循环处理 requestData 数据,内存就不会上涨,但这样会影响接口的响应速度,使用协程处理的话内存会立即上涨,这是为什么呢? 如果开启协程占用了内存,可是协程只有几 KB 呀,若协程内的数据处理占用,那不用协程也占用了内存啊,我的理解用了协程处理能力应该会提高才对啊,为什么反倒下降了呢?附上 pprof 内存图
func PushBatch(c *gin.Context) {
appGin := app.Gin{C: c}
gameId := c.Query("game_id")
if gameId == "" {
// 记录错误日志···
return
}
// 读取请求体
bodyBytes, err := io.ReadAll(c.Request.Body)
if err != nil {
// 记录错误日志···
return
}
// JSON 反序列化
var requestData []map[string]interface{}
if err := json.Unmarshal(bodyBytes, &requestData); err != nil {
// 记录错误日志···
return
}
// 1.没有使用协程
for _, entry := range requestData {
fmt.Println(len(entry))
// Todo 数据写入 Kafaka
}
// 2.数据放入协程
//go func(requestData []map[string]interface{}) {
//
// for _, entry := range requestData {
// //handleEntry(gameId, entry)
// fmt.Println(len(entry))
// }
//}(requestData)
// 返回状态 200
appGin.Response( http.StatusOK, e.SUCCESS, nil)
}
1
lifei6671 316 天前
使用了协程后,HTTP 请求会立即返回,如果 qps 很大或持续时间很长,会导致你的协程数量暴增,且每个协程都会保持一个 map 数据,当然会导致内存暴增了。
建议使用协程池,保持一定的协程数量,每个协程用来处理写 kafka 的数据,这样可以控制协程数量不会暴涨。还有你这个 json 序列化也有问题,建议直接序列化为结构体,可以使用流式 json 解析,而不是 read 所有数据。 |
2
GooMS 316 天前 via Android
其次携程也是要回收的,处理好错误和超时
|
3
zsj1029 316 天前
都已经用 gin 框架了,自带协程池处理请求,没必要自己写协程处理了,func 里面写正常的 Kafka insert 逻辑即可,一秒上千条的处理很轻松的
|
4
zdt3476 316 天前
你有看过请求测试的时候 goroutine 的数量吗?大概率就是开了太多 goroutine 了。你固定开几个 goroutine 执行 kafka 写入的操作。PushBatch 这个接口通过写入一个带 buffer 的 channel 把数据投递到前面的 goroutine 中就好了。
|
5
iyaozhen 316 天前
你不要说多少条,一条多大呢
你说的暴涨是多大呢,这个预期就是会暴涨的,不要说 2G 到 4G 。 kafka 的吞吐到了多少呢,这里卡住也会影响性能的 我猜你这个服务是接收日志或者打点上报的。我之前拿 php-Swoole 做过,3-4w qps 没有问题 你给的信息太少了,大概猜测下你说的问题: 1. 发现如果不使用协程直接 for 循环处理 requestData 数据,内存就不会上涨 那是因为这样接口耗时会增加,这样发压端压力上不来。不知道你是什么发压模型 2. 使用协程处理的话内存会立即上涨,这是为什么呢? 同 1 ,耗时下降,这样单位时间内,你收到的请求更多了,请求 data 都在内存里,可不就上涨了 我的建议是,发送到 kafka 就不要协程了,让这点消耗体现在接口耗时上 |
6
sujin190 316 天前
好像 golang 的 goroutine 调度并不是平衡调度,所以并发很高接口提交速度这么快的话,消费不足肯定导致大量数据拥塞在内存中内存使用量肯定高了,不过你可以试试看其实应该没有一直涨,看你这量不平衡导致内存使用估计得接近 10G 级别了吧,线程的调度平衡性要好的多,但是量特别大也是要考虑线程调度的平衡性影响的
但是换个方向如果你生成业务提交数据不是一直这么高的话,内存充足无所谓的吧,否则如果一直都这么高,或者对内存使用有极度需求,那么你使用 goroutine 提交到 Kafaka 并不能提高你整个系统的吞吐,整体来看对接口延时也帮助不大,整体还是受限于内存大小和 Kafaka 写入速度,异步 goroutine 提交没啥用吧,多余了 |
7
fruitmonster OP 确实,找到问题了,大量且时间长的请求,协程中的 for 循环会把 CPU 跑满,导致一直积压,不能及时处理,但请求依旧,依旧在启动新的协程,导致内存暴涨。因为接口的请求体字段是不固定的,不知道数量,不知道类型,所以我就想的是使用了 map
```go // 2.数据放入协程 go func(requestData []map[string]interface{}) { for _, entry := range requestData { //handleEntry(gameId, entry) fmt.Println(len(entry)) } }(requestData) ```` |
8
fruitmonster OP @GooMS 确实,协程一直在启动 CPU 跑满,协程不能被正常释放,一直在处理 for 循环的数据
|
9
FreeEx 316 天前
1. 删除协程。
2. 发送 kafka 改成异步批量发送。 |
10
fruitmonster OP @iyaozhen
一次请求,数据体的大小大概 500kb 暴涨就是从服务启动的几十兆,到 7 G 到 10G ,只要给压力会持续暴涨,无限上涨,若服务不停,内存用完为止 您猜测的没错,之前我的目的是为了接口尽可能快的返回数据,我就把数据丢给了协程,让携程去处理,并且在协程中愚蠢的增加了验证、解析的逻辑,等把这部分数据整理完之后再写入 kafka ,问题就出在整理这里,目前发现的问题是:当请求量非常大,原有代码的 for 循环,本身会占用 CPU 的能力,大量请求把 CPU 全部跑满,CPU 处理不了,数据就会在内存中越积越多,所以内存就会上涨 |
11
fruitmonster OP @sujin190 是的,您分析的没错,我之前把数据交给协程去处理,因为我想在协程里加一些验证,能过通过验证才会写入 Kafka ,现在看来我完全是错的
|
12
fruitmonster OP @zsj1029 大哥,请明示,没找到 Gin 自带协程池处理请求的相关内容哇
|
13
kkbblzq 316 天前
你这写的有点无力吐槽。。
1. 你这样开协程只是提前返回罢了,对处理效率并没有改进,完全没有必要开,你开了无非是多了一堆挂在后台跑的协程,内存不大才怪了。真开协程应该是按 kafka 单包大小进行分片,比如 100 条数据一个协程发送,且应该开 wg 等协程跑完再返回的。 2. 既然你不需要了解结构,单纯解数组的话,[]json.RawMessage 就可以了,何必解成 map ,你发 kafka 又得序列化回去,反复的消耗 cpu 何必呢。 ... |
14
fruitmonster OP @kkbblzq 条条在理,正在改
|
15
leyoumake1997 316 天前
将请求数据写入到队列,然后另外一个服务去消费到 kafka 里面
|
16
leonshaw 316 天前
读 request body 时就已经占了一部分内存了,不开新协程时是因为没有提前返回导致并发数受到了限制,确认一下这种情况下并发瓶颈是在客户端还是服务端。你需要的是在服务端读 request body 之前限制并发。
|
17
lsk569937453 315 天前
这是一个批量接口,目的是接口接收数据,数据交给协程存入 Kafka ,接口立即响应成功,越快越好,请求频率每秒 70 次,一次请求数组携带 1000 条数据。单次请求数据大小 500kb 。
============================ 先说最简答的方案,加机器。后端服务本来就是无状态的,kafka 也绝对不是瓶颈。而且你自己也说了后端服务内存压力很大,那直接加机器就好了。 其次,在不加机器的情况下,你的代码还有可以继续优化的空间。后端收到请求后不校验,直接完整的将 http 请求写入 kafka ,省去序列化( http 请求到 struct)和反序列化(struct 到 kafka 的 body)的 cpu 。由后续的消费者去一次消费 1000 条消息做处理/校验。 技术没有银弹,完全看你怎么取舍。以前我可能采取第二种方案,现在年龄大了,不想折腾了,我只想会加机器。 |
18
PiersSoCool 315 天前
我处理每秒最高十几万 QPS ,这种不用开协程,直接用 gin 就好;这里直接用 k8s 扩容即可,甚至可以压测出 QPS 按照 QPS 自动扩容
confluent kafka golang 那边,以及 kafka 公有云上会有 bug ,小心踩坑 |
19
fruitmonster OP @PiersSoCool 请问怎么直接用 Gin 啊,我新手,没听太明白,谢谢啦~
|
20
PiersSoCool 312 天前
@fruitmonster gin 直接处理请求,和上面类似的不开协程,gin 每个 http 请求本身就是个协程,直接往 kafka 写数据
|