m0d9

celery 学习笔记

基础概念

Queue,exchange,routing_key

Broadcast

http://docs.celeryproject.org/en/4.0/userguide/routing.html#broadcast

Demo

1
2
3
4
from kombu.common import Broadcast
app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.task_routes = {'tasks.reload_cache': {'queue': 'broadcast_tasks'}}
  • 4.*版本不能够写入rabbitmq
  • 3.*测试成功

自定义control命令

  1. 定义

    1
    2
    3
    4
    5
    6
    from celery.worker.control import Panel
    @Panel.register
    def increase_prefetch_count(state, n=1):
    state.consumer.qos.increment_eventually(n)
    return {'ok': 'prefetch count incremented'}
  2. 调用

    1
    app.control.broadcast('checkVersion',kwargs=kwargs,destination=destination)

event

flower event

参考