python 版本:3.10.1 或 3.10.2
代码:
def main():
    log_listener = setup_logging(log_filename)
    e = asyncio.Event()
    consumer = Consumer(e)
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for sig in signals:
        loop.add_signal_handler(
            sig, lambda s=sig: asyncio.create_task(shutdown(s, loop, consumer))
        )
    tasks = consumer.run()
    try:
        for name, task in tasks.items():
            loop.create_task(task, name=name)
        loop.run_forever()
    finally:
        loop.close()
        log_listener.stop()
consumer.run() 方法会返回一个 Dict[str, Coroutine] 类型的字典。最初以为是自己的 coroutine 实现有问题导致 high CPU 。然后将 tasks 中的 coroutine 一个个移除,最后 tasks 返回空的情况下也是 100% CPU 。
用 cProfile 看了下:
   Ordered by: cumulative time
   List reduced from 3038 to 10 due to restriction <10>
   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
    423/1    0.005    0.000   44.862   44.862 {built-in method builtins.exec}
        1    0.000    0.000   44.862   44.862 myscript.py:1(<module>)
        1    0.000    0.000   44.219   44.219 myscript.py:54(main)
        1    0.000    0.000   44.017   44.017 /home/xxx/.pyenv/versions/3.10.1/lib/python3.10/asyncio/base_events.py:582(run_forever)
        3    0.000    0.000   44.016   14.672 /home/xxx/.pyenv/versions/3.10.1/lib/python3.10/asyncio/base_events.py:1806(_run_once)
        3    0.000    0.000   43.983   14.661 /home/xxx/.pyenv/versions/3.10.1/lib/python3.10/selectors.py:452(select)
        3   43.983   14.661   43.983   14.661 {method 'poll' of 'select.epoll' objects}
    484/4    0.006    0.000    0.642    0.161 <frozen importlib._bootstrap>:1022(_find_and_load)
    483/4    0.003    0.000    0.642    0.161 <frozen importlib._bootstrap>:987(_find_and_load_unlocked)
    447/5    0.003    0.000    0.641    0.128 <frozen importlib._bootstrap>:664(_load_unlocked)
有 v 友遇到类似情况的吗?还是说我的用法有问题。
    def _start_kafka_client(self) -> None:
        logging.debug(f"[_start_kafka_client] id(event)={id(self._event)}")
        i = 0
        try:
            while not self._event.is_set():
                msg_packs = self._kafka_client.poll(
                    timeout_ms=1000,
                    max_records=5000,
                )
                if not msg_packs:
                    continue
                # msgs is of type list containerd with ConsumerRecords
                tp: kafka.TopicPartition
                msgs: List[ConsumerRecord]
                for tp, msgs in msg_packs.items():
                    self._data_queue.put_nowait(msgs)
                    i += len(msgs)
                    if i % 1000 == 0:
                        logging.info(f"count of msgs: {i}")
        except asyncio.QueueFull:
            logging.debug("data queue is full")
            time.sleep(1)
        except Exception as e:
            logging.error(f"error: {e}")
        finally:
            logging.debug("_start_kafka_client terminates")
|  |      1netcan      2022-04-07 17:20:05 +08:00 tasks 为空的话,`loop.run_forever()`直接就返回了,建议上完整代码,例如 Consumer | 
|  |      2Nitroethane OP @netcan #1 已添加。就是从 kafka 接收数据,解析之后写到 es 里 | 
|  |      3makerbi      2022-04-07 18:24:09 +08:00 应该是 while 里没有 sleep 的原因吧 time.sleep(0.1)也好过完全没有 sleep | 
|  |      4jenlors      2022-04-07 18:32:26 +08:00 从 kafka 读取消息的时候 block 设置为 True ,直接阻塞循环 | 
|  |      5Nitroethane OP | 
|  |      6Richard14      2022-04-10 07:35:22 +08:00 问题太长,且缺乏最小实现,1L 代码里很多不明实现的东西,实在是不想看。而且问题给人感觉很像 AB 问题 | 
|  |      7Nitroethane OP @Richard14 #6 我寻思也没强制你看呀😁,不想看就别回复呀 :) | 
|      8lolizeppelin      2022-04-12 23:35:44 +08:00 不建议 sleep 0.1 ,sleep 0.001 都不适合。 一般来说都是通过监听事件 fd 来实现 sleep 的同时能及时响应 |