V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
129duckflew
V2EX  ›  程序员

SpringBoot 集成 MQTT 并发发送消息异常有大佬帮忙看看吗

  •  
  •   129duckflew · 25 天前 · 1319 次点击

    我在 github 搜索到一个有点年头了 但是可能有关的 issue , 这个 issue 已经关闭了,本来还想留言来着 https://github.com/eclipse/paho.mqtt.java/issues/323
    我使用 Spring-Integration-mqtt 模块发送和接收 MQTT 消息,发生以下异常:

    2024-10-22 10:56:45.161 [][] ERROR o.s.i.handler.LoggingHandler:250 - org.springframework.messaging.MessageHandlingException: Failed to publish to MQTT in the [bean 'mqttOutboundHandler' for component 'mqttOutboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'; defined in: 'class path resource [com/rms/config/mqtt/MqttConfig.class]'; from source: 'com.rms.config.mqtt.MqttConfig.mqttOutboundHandler(org.eclipse.paho.mqttv5.client.MqttConnectionOptions)'], failedMessage=GenericMessage [payload={"value":[],"unitId":741,"fieldName":"landingCall-down-front-1-32","isRunningData":false,"isError":false}, headers={replyChannel=nullChannel, errorChannel=, mqtt_qos=0, id=1237ba75-e408-10ff-e322-5f692dd8970e, mqtt_topic=status/741/landingCall-down-front-1-32, timestamp=1729565799271}]
        at org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler.publish(Mqttv5PahoMessageHandler.java:283)
        at org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler.handleMessageInternal(Mqttv5PahoMessageHandler.java:222)
        at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
        at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
        at org.springframework.integration.dispatcher.UnicastingDispatcher$1.run(UnicastingDispatcher.java:129)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:56)
        at java.base/java.util.concurrent.ThreadPerTaskExecutor$TaskRunner.run(ThreadPerTaskExecutor.java:314)
        at java.base/java.lang.VirtualThread.run(VirtualThread.java:311)
    Caused by: Internal error, caused by no new message IDs being available (32001)
        at org.eclipse.paho.mqttv5.client.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:32)
        at org.eclipse.paho.mqttv5.client.internal.ClientState.getNextMessageId(ClientState.java:1454)
        at org.eclipse.paho.mqttv5.client.internal.ClientState.send(ClientState.java:511)
        at org.eclipse.paho.mqttv5.client.internal.ClientComms.internalSend(ClientComms.java:155)
        at org.eclipse.paho.mqttv5.client.internal.ClientComms.sendNoWait(ClientComms.java:218)
        at org.eclipse.paho.mqttv5.client.MqttAsyncClient.publish(MqttAsyncClient.java:1530)
        at org.eclipse.paho.mqttv5.client.MqttAsyncClient.publish(MqttAsyncClient.java:1499)
        at org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler.publish(Mqttv5PahoMessageHandler.java:271)
        ... 10 more
    

    环境:

    • SpringBoot 3.2.5
    • java21
    • PahoMQTT1.2.5 下面是我的核心逻辑。我使用 Spring-Integration 集成流程配置了两个 Mqtt 客户端。一个用于接收信息,另一个用于发送消息。发送消息的频率约为每秒 5000 条消息。
    mqtt:
      client-id-inbound: rms-inbound
      client-id-outbound: rms-outbound
      url: tcp://127.0.0.1:1883
      username: rms
      password: 123456
    
    import java.util.concurrent.Executors;
    
    
    @Configuration
    @IntegrationComponentScan("com.rms.config")
    @Slf4j
    @ConfigurationProperties(prefix = "mqtt")
    @Data
    public class MqttConfig {
        private String clientIdInbound;
        private String clientIdOutbound;
        private String url;
        private String password;
        private String username;
        @Bean
        public MqttConnectionOptions mqttConnectOptions(){
            MqttConnectionOptions options = new MqttConnectionOptions();
            options.setServerURIs(new String[] { url});
            options.setUserName(username);
            options.setPassword(password.getBytes());
            options.setAutomaticReconnect(true);
            return options;
        }
        @Bean
        public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager(MqttConnectionOptions options) {
            Mqttv5ClientManager clientManager = new Mqttv5ClientManager(options, clientIdInbound);
            clientManager.setPersistence(new MqttDefaultFilePersistence());
            return clientManager;
        }
    
        @Bean
        public SimpleMessageConverter simpleMessageConverter(){
            return new SimpleMessageConverter();
        }
    
        @Bean
        public MessageHandler mqttOutboundHandler(MqttConnectionOptions connectionOptions) {
            Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(connectionOptions,clientIdOutbound);
            messageHandler.setAsync(true);
            messageHandler.setDefaultTopic("defaultTopic");
            messageHandler.setDefaultQos(MqttQoS.AT_MOST_ONCE.value());
            messageHandler.setConverter(simpleMessageConverter());
            return messageHandler;
        }
    
    
        @Bean
        public IntegrationFlow mqttOutboundFlow(MessageHandler mqttOutboundHandler){
            return IntegrationFlow.from("mqttOutboundChannel")
                    .channel(MessageChannels.executor(Executors.newVirtualThreadPerTaskExecutor()))
                    .handle(mqttOutboundHandler)
                    .get();
        }
    
        @Bean
        public IntegrationFlow statusInboundFlow(ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManage){
            Mqttv5PahoMessageDrivenChannelAdapter messageProducer  =
                    new Mqttv5PahoMessageDrivenChannelAdapter(clientManage, "status/+/#");
            return IntegrationFlow.from(messageProducer)
                    .channel(MessageChannels.executor(Executors.newVirtualThreadPerTaskExecutor()))
                    .transform(Transformers.objectToString())
                    .transform(Transformers.fromJson(Prop.class))
                    .route(Prop.class,prop->{
                        if (Boolean.TRUE.equals(prop.getIsError())){
                            return "errorDataChannel";
                        }
                        else if (Boolean.TRUE.equals(prop.getIsRunningData())){
                            return "runningDataChannel";
                        }
                        else {
                            return "discardChannel";
                        }
                    })
                    .get();
        }
        @Bean
        public CanProtocolLoader canProtocolLoader(){
            return new CanProtocolLoader();
        }
        @Bean
        public UnitErrorHandler errorLogHandler(LogService logService ){
            return new UnitErrorHandler(logService);
        }
        @Bean
        public UnitRunningDataHandler unitRunningDataHandler(LogService logService){
            return new UnitRunningDataHandler(logService);
        }
        @Bean
        public IntegrationFlow errorLogChannelFlow(UnitErrorHandler unitErrorHandler){
            return IntegrationFlow.from("errorDataChannel")
                    .channel(MessageChannels.executor(Executors.newVirtualThreadPerTaskExecutor()))
                    .handle(unitErrorHandler)
                    .get();
        }
        @Bean
        public IntegrationFlow runningDataChannelFlow(UnitRunningDataHandler runningDataHandler){
            return IntegrationFlow.from("runningDataChannel")
                    .channel(MessageChannels.executor(Executors.newVirtualThreadPerTaskExecutor()))
                    .handle(runningDataHandler)
                    .get();
        }
      
        @Bean
        public IntegrationFlow discardChannelFlow(){
            return IntegrationFlow.from("discardChannel")
                    .channel(MessageChannels.executor(Executors.newVirtualThreadPerTaskExecutor()))
                    .handle(message -> {
                    })
                    .get();
        }
    }
    
    @Slf4j
    @RequiredArgsConstructor
    public class UnitErrorHandler implements MessageHandler {
        private final LogService logService;
    
        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            Prop<?> prop = (Prop<?>) message.getPayload();
            logService.saveErrorLog(prop);
        }
    
    }
    

    我使用 MessagingGateway 注释将消息定向到我的 mqttOutboundChannel ,以便我可以使用 MqttGateway 发送 mqtt 消息

    
    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MqttGateway {
        void sendToMqtt(String data);
    
        void sendToTopic(String payload, @Header(MqttHeaders.TOPIC)String topic);
    
        void sendToTopic(String payload, @Header(MqttHeaders.TOPIC)String topic,@Header(MqttHeaders.QOS ) int qos);
        void sendWithResp(String payload, @Header(MqttHeaders.TOPIC)String topic,@Header(MqttHeaders.RESPONSE_TOPIC) String responseTopic,@Header(MqttHeaders.QOS ) int qos);
    }
    
    

    当消息投递率很低,一般每秒不到 200 条消息时,我一开始提到的错误就不会出现,但是当我将消息投递率提高到 5000 条时,按照上面的配置就开始出现报错,另外如果我去掉了 mqtt 消息的入站部分,也就是上面的代码中引入 的 statusInboundFlow 集成 Flow 后 发送消息又不报错了

    14 条回复    2024-10-25 09:40:35 +08:00
    maokg
        1
    maokg  
       25 天前
    Caused by: Internal error, caused by no new message IDs being available (32001),好像是 id 的问题
    129duckflew
        2
    129duckflew  
    OP
       25 天前
    @maokg 嗯,应该在发送大量消息的时候因为某些原因耗尽了可用的 ID 分配,看了一下源码的 at org.eclipse.paho.mqttv5.client.internal.ClientState.getNextMessageId(ClientState.java:1454) 这里从 inUseIdMap 查找三次如果找不到可以使用的 id 就会抛出这个异常,但是我不明白的点是什么原因导致 Client 一直不释放这些 id
    ZGame
        3
    ZGame  
       25 天前
    @129duckflew 我猜是背压问题 我感觉可能是你代码写的有问题
    ZGame
        4
    ZGame  
       25 天前
    每秒 5000 条...
    maokg
        5
    maokg  
       25 天前
    @129duckflew 网络差? client 没收到然后没确认( Qos 1 or 2)
    129duckflew
        6
    129duckflew  
    OP
       25 天前
    @maokg 代码里面发送的都是 QOS 0 消息
    129duckflew
        7
    129duckflew  
    OP
       25 天前
    @ZGame paho 客户端似乎没提供背压机制吧,背压我确实不太了解 只在响应式流里面听到过
    ZGame
        8
    ZGame  
       25 天前
    @129duckflew #7 mqtt 服务端借助第三方吧 阿里云或者自己搭建的,应该就没有这个问题了?像生产数据太快来不及消费吧
    129duckflew
        9
    129duckflew  
    OP
       25 天前
    @ZGame 我感觉不是数据没有消费的问题,我在帖子里面提到,我把入站监听注释掉 只发送不接受 就可以正常发送不报错的
    ZGame
        10
    ZGame  
       25 天前
    @129duckflew #9 对阿这就是背压问题啊 生产者生产太快消费者来不及消费不就内存溢出 oom 背压问题了吗...
    ZGame
        11
    ZGame  
       25 天前
    @129duckflew #9 所以你应该做的是 比如说问问 gpt ,有没有开源的 mqtt 服务端 能够把消息聚集起来,然后你客户端按一定频率去消费 ,多的选择丢弃或者 降低发送频率,
    huifer
        12
    huifer  
       23 天前
    能不能提供一个 DEMO 发给我,我正好在写这个方面的书籍。 交流可以加一加我 cWZ5ZDA5NQ==
    asp1111
        13
    asp1111  
       23 天前
    为啥没用 netty ,至少懂的的人多一些
    129duckflew
        14
    129duckflew  
    OP
       23 天前
    @huifer 我已经编写了一个最小 demo 请查看: https://github.com/129duckflew/inte-paho-test.git
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2405 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 32ms · UTC 01:57 · PVG 09:57 · LAX 17:57 · JFK 20:57
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.