celery 笔记

基础概念

Queue,exchange,routing_key

广播消息 Broadcast

http://docs.celeryproject.org/en/4.0/userguide/routing.html#broadcast

Demo

1
2
3
4
from kombu.common import Broadcast

app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.task_routes = {'tasks.reload_cache': {'queue': 'broadcast_tasks'}}
  • 4.*版本不能够写入rabbitmq
  • 3.*测试成功

自定义control命令

  1. 定义

    1
    2
    3
    4
    5
    6
    from celery.worker.control import Panel

    @Panel.register
    def increase_prefetch_count(state, n=1):
    state.consumer.qos.increment_eventually(n)
    return {'ok': 'prefetch count incremented'}
  2. 调用

    1
    app.control.broadcast('checkVersion',kwargs=kwargs,destination=destination)

事件处理event

flower event

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
logging.info('start moniter celery event')
connection = BrokerConnection(celeryconfig.BROKER_URL)
...
def on_event(event):
logging.info('on event')

...
while True:
try:
with connection as conn:
recv = EventReceiver(conn,
handlers={
'task-failed' : on_task_failed,
'task-succeeded' : on_task_succeeded,
'task-sent' : on_task_sent,
'task-received' : on_task_received,
'task-revoked' : on_task_revoked,
'task-started' : on_task_started,
'task-custom' : on_task_custom,
# '*' : on_event,
'worker-heartbeat': on_worker_heartbeat,
'worker-online': on_worker_online,
'worker-offline': on_worker_offline,
# self
'worker-reg':on_worker_reg,
'task-log' : on_event,
})
recv.capture(limit=None, timeout=None)
except (KeyboardInterrupt, SystemExit):
print('EXCEPTION KEYBOARD INTERRUPT')
sys.exit()

问题

1. rabbitmq unack message

rabbitmq消息量过大,可能导致


参考