V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
fanhaipeng0403
V2EX  ›  Python

分享个 celery 的监控脚本吧

  •  1
     
  •   fanhaipeng0403 · 2019-01-03 11:00:51 +08:00 · 2257 次点击
    这是一个创建于 2207 天前的主题,其中的信息可能已经有所发展或是发生改变。
    # coding=utf-8
    import celery
    import celery.states
    import celery.events
    import collections
    from itertools import chain
    import logging
    import prometheus_client
    import sys
    from threading import Thread
    import time
    import json
    import os
    from app.tasks import celery as app
    
    # 监测的指标
    
    # 在线 worker 数量
    WORKERS = prometheus_client.Gauge('celery_workers', 'Number of alive workers')
    
    # 每种状态任务的数量
    TASKS = prometheus_client.Gauge('celery_tasks', 'Number of tasks per state', ['state'])
    # 每种状态任务的名字和数量, celery 所有任务概览
    TASKS_NAME = prometheus_client.Gauge('celery_tasks_by_name', 'Number of tasks per state and name', ['state', 'name'])
    # 每个任务的执行时间,监测任务本身性能,用于优化 sql
    TASKS_RUNTIME = prometheus_client.Histogram('celery_tasks_runtime_seconds', 'Task runtime (seconds)', ['name'])
    # 每个任务的启动时间,监测阻塞情况, 用于分配调节 worker 数量
    LATENCY = prometheus_client.Histogram('celery_task_latency', 'Seconds between a task is received and started.')
    
    logger = logging.getLogger(__name__)
    
    
    class WorkerMonitoring:
    
        def __init__(self, app):
            self._app = app
    
        def run(self):
            while True:
                self.update_workers_count()
                time.sleep(5)
    
        def update_workers_count(self):
            try:
                WORKERS.set(len(self._app.control.ping(timeout=5)))
            except Exception as exc:
                logger.exception("Error while pinging workers")
    
    
    class TaskMonitoring:
    
        def __init__(self, app):
            self._app = app
            self._state = self._app.events.State()
            self._known_states = set()
            self._known_states_names = set()
    
        def run(self):
            self._monitor()
    
        def _process_event(self, event):
            print(event)
            # 时间可能并发过来,加锁
            with self._state._mutex:
                if event['type'] != 'worker-heartbeat':
                    event_type = event['type'][5:]
                    state = celery.events.state.TASK_EVENT_TO_STATE[event_type]
                    if state == celery.states.STARTED:
                        # 监测启动时间
                        self._observe_latency(event)
    
                    self._collect_tasks(event, state)
    
        def _observe_latency(self, event):
            try:
                prev_evt = self._state.tasks[event['uuid']]
            except KeyError:
                pass
            else:
                if prev_evt.state == celery.states.RECEIVED:
                    LATENCY.observe(
                        event['local_received'] - prev_evt.local_received)
    
        def _collect_tasks(self, event, state):
            if state in celery.states.READY_STATES:
                self._incr_ready_task(event, state)
            else:
    
                self._state._event(event)
    
            self._collect_unready_tasks()
    
        def _incr_ready_task(self, event, state):
    
            #  'FAILURE', 'REVOKED', 'SUCCESS' 任务信息
            TASKS.labels(state=state).inc()
            try:
                name = self._state.tasks.pop(event['uuid']).name
                runtime = event.get('runtime')
                
                if name is not None and runtime is not None:
                    TASKS_RUNTIME.labels(name=name).observe(runtime)
            except (KeyError, AttributeError):
                pass
    
        def _collect_unready_tasks(self):
            # 'PENDING', 'RECEIVED', 'REJECTED', 'RETRY', 'STARTED 任务信息
            cnt = collections.Counter(t.state for t in self._state.tasks.values())
            self._known_states.update(cnt.elements())
            for task_state in self._known_states:
                TASKS.labels(state=task_state).set(cnt[task_state])
    
            cnt = collections.Counter((t.state, t.name) for t in self._state.tasks.values() if t.name)
            self._known_states_names.update(cnt.elements())
            for task_state in self._known_states_names:
                TASKS_NAME.labels(state=task_state[0], name=task_state[1], ).set(cnt[task_state])
    
        def _monitor(self):
            while True:
                try:
                    with self._app.connection() as conn:
                        # 从 broker 接收所有的事件,并交给 process_event 处理
                        logger.info("Try to connect to broker")
                        recv = self._app.events.Receiver(conn, handlers={'*': self._process_event, })
    
                        setup_metrics(self._app)
                        recv.capture(limit=None, timeout=None, wakeup=True)
                        logger.info("Connected to broker")
    
                except Exception as e:
                    logger.exception("Queue connection failed")
                    setup_metrics(self._app)
                    time.sleep(5)
    
    
    def setup_metrics(app):
        WORKERS.set(0)
        try:
            registered_tasks = app.control.inspect().registered_tasks().values()
        except Exception as e:
    
            for metric in TASKS.collect():
                for name, labels, cnt in metric.samples:
                    TASKS.labels(**labels).set(0)
            for metric in TASKS_NAME.collect():
                for name, labels, cnt in metric.samples:
                    TASKS_NAME.labels(**labels).set(0)
        else:
    
            # 'FAILURE', 'PENDING', 'RECEIVED', 'RETRY', 'REVOKED', 'STARTED', 'SUCCESS'
            for state in celery.states.ALL_STATES:
    
                TASKS.labels(state=state).set(0)
                for task_name in set(chain.from_iterable(registered_tasks)):
                    TASKS_NAME.labels(state=state, name=task_name).set(0)
    
    
    class EnableEvents:
    
        # celery 有个问题,即使配置了 CELERY_SEND_EVENTS,也不发送事件,采取这种方式
    
        def __init__(self, app):
            self._app = app
    
        def run(self):  # pragma: no cover
            while True:
                try:
                    self.enable_events()
                except Exception as exc:
                    self.log.exception("Error while trying to enable events")
                time.sleep(5)
    
        def enable_events(self):
            self._app.control.enable_events()
    
    def start_httpd(addr):
        host, port = addr.split(':')
        logging.info('Starting HTTPD on {}:{}'.format(host, port))
        prometheus_client.start_http_server(int(port), host)
    
    
    def celery_monitoring():
        setup_metrics(app)
    
        e = Thread(target=EnableEvents(app).run)
        e.daemon = True
        e.start()
    
        w = Thread(target=WorkerMonitoring(app).run)
        w.daemon = True
        w.start()
    
        t = Thread(target=TaskMonitoring(app).run)
        t.daemon = True
        t.start()
    
        start_httpd('0.0.0.0:49792')
    
        t.join()
        w.join()
        e.join()
    
    @manager.command
    def start_celery_monitoring():
        """
        nohup python manage.py start_celery_monitoring &
        """
        celery_monitoring()
    
    4 条回复    2019-01-03 12:47:43 +08:00
    leisurelylicht
        1
    leisurelylicht  
       2019-01-03 11:21:16 +08:00 via iPhone
    推荐楼主传到 github 上给我们个链接就行了
    zhoudaiyu
        2
    zhoudaiyu  
       2019-01-03 12:18:24 +08:00 via iPhone
    兄弟是看我帖子了吗
    fanhaipeng0403
        3
    fanhaipeng0403  
    OP
       2019-01-03 12:46:16 +08:00
    @zhoudaiyu 看了~我最近也要处理这个事情~
    Nick2VIPUser
        4
    Nick2VIPUser  
       2019-01-03 12:47:43 +08:00 via iPhone
    @leisurelylicht 最好加上 readme ~
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2786 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 14:39 · PVG 22:39 · LAX 06:39 · JFK 09:39
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.