codedreamstar 最近回复了
我页面一直没刷新, 没看到你的回复, 继续按你的场景回复.
你的中间进程下发 B 本身就是有限流逻辑的吧. 否则按照 A 与中间进程通过 Redis 交互本身就会造成与中间进程对 B 这个链路的生产消费速率失调.
如果中间进程只是获取数据并下发应用 B, 不涉及对数据在加工, 直接把中间进程和 Redis 砍掉, 这个下发逻辑合并到消费者的消费逻辑.
如果中间进程负责数据加工再下发, 那就把 Kafka 的逻辑合并到那个中间进程的应用上, 再按照上述方法.
如果现在应用 A 和中间进程不能合并, 那么就把 Redis 砍掉, 使用同步调用 A-中-B 的方式, 中间失败重试就行, 或者 A 与中间进程不使用 Redis, 也换成 Kafka, 以免生产速率过高 Redis 爆内存, 中间进程与 B 使用上述方法, 超过 TPS 报错就重试, 中间加个等待步进或者限流器.
最好的情况就是沟通应用 B, 让 B 提供异步接口, TPS 问题让他们内部解决, 你就可以看情况把中间没用步骤都砍掉.
你是在自己封装类似 Spring for Kafka 这样的框架吗? 那就照着 Spring for Kafka 的设计思路抄, 按照你们的需求精简一下.
不要在框架侧依赖这些配置以达到既定的框架逻辑, 这些配置都是业务侧来根据业务情况配置的, 你在框架层面能获取到的信息必定小于业务侧, 这些配置是给业务侧介入底层的手段.
框架需要做的是隐藏消费者创建的细节, 消息路由到消息处理方法的细节, 获取\提交消息的细节等各种可以封装的细节, 提供给业务侧封装好的接口或者使用方法就好.
你的设计应该是为每个消费者创建一个线程, 这个线程死循环 poll 以及 poll 之后对消息的路由以及处理, 消费完自动就该 poll, poll 之后就开始消费, 根本不存在需要限制速率的地方, 消费速度就是速率, 需要提高并发度只需要控制创建的消费者数量就行(当然要有对应的分区数量).
我看你的文章应该是给每一个消费者配了一个线程池, 路子是错的, 先不谈速率问题, 在业务侧按分区顺序消费都已经没有办法了, 同一个分区的消息都被线程池给并行了.
如果有我误解的地方欢迎你再回复我.
kafka 本身就是 poll 模型, 处理速率和并发度都是预先设置的, 理论上不搞花活是不会出现需要控制消费者消费速率问题的, 极大概率是错误设计或者滥用了
如果消费者处理的消息是无关的, 那么每个消费者消费单个消息只需要加分区和消费者数量
如果消费者处理的消息是相关的, 也就是需要一批一起处理的(如果相关,在生产者就应该打包成一条消息), 那为什么又加个线程池并行跑..
暂且不论是否合理, 你的 offset 是怎么提交的, 在批量用线程池的情况下?
如果是等待这批消息处理完统一提交, 那么通过限制线程池能到达限制效果
如果是丢到线程池中后直接提交, 直接改方案, 这个方案基本等于错误设计
技术上通过各种方式都能实现目标, 但是真的需要在技术面解决吗?
如果方便可以发下业务场景一起探讨一下
有没有一种可能, 你的 Java 应用=线程池, 创建的消费者数量=线程数, topic=任务队列, 所以在消费者这里再加一个内存的线程池是为了什么?
至于控制 kafka 消费者查一下 pause 相关的 api, 能暂停或者说挂起消费者